Commit f938c26c authored by Philip Carns's avatar Philip Carns
Browse files

rpc breadcrumb impl with new margo_request type

- first draft, not complete
parent b2e49062
......@@ -60,6 +60,7 @@ int main(int argc, char **argv)
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
margo_diag_start(mid);
/* register core RPC */
my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc",
......@@ -145,6 +146,7 @@ int main(int argc, char **argv)
margo_addr_free(mid, data_xfer_svr_addr);
/* shut down everything */
margo_diag_dump(mid, "-", 0);
margo_finalize(mid);
free(buffer);
......
......@@ -42,6 +42,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_diag_dump(mid, "-", 0);
margo_finalize(mid);
return;
......@@ -80,6 +81,7 @@ int main(int argc, char **argv)
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
margo_diag_start(mid);
/* figure out what address this server is listening on */
hret = margo_addr_self(mid, &addr_self);
......
......@@ -922,6 +922,15 @@ void __margo_internal_incr_pending(margo_instance_id mid);
*/
void __margo_internal_decr_pending(margo_instance_id mid);
/**
* @private
* Internal function used by DEFINE_MARGO_RPC_HANDLER, not supposed to be
* called by users!
*
* @param rpc_breadcrumb RPC tracking breadcrumb
*/
void __margo_internal_breadcrumb_handler_set(uint64_t rpc_breadcrumb);
/**
* macro that registers a function as an RPC.
*/
......@@ -944,11 +953,22 @@ void __margo_internal_decr_pending(margo_instance_id mid);
/**
* macro that defines a function to glue an RPC handler to a ult handler
* @param [in] __name name of handler function
*
* Note: we use this opportunity to set a thread-local argobots key that stores
* the "breadcrumb" that was set in the RPC. It is shifted down 16 bits so that
* if this handler in turn issues more RPCs there will be a stack showing their
* ancestry.
*/
#define DEFINE_MARGO_RPC_HANDLER(__name) \
void __name##_wrapper(hg_handle_t handle) { \
margo_instance_id __mid; \
hg_return_t __ret; \
uint64_t *__rpc_breadcrumb; \
__mid = margo_hg_handle_get_instance(handle); \
__ret = HG_Get_input_buf(handle, (void**)&__rpc_breadcrumb, NULL); \
assert(__ret == HG_SUCCESS); \
*__rpc_breadcrumb = le64toh(*__rpc_breadcrumb); \
__margo_internal_breadcrumb_handler_set((*__rpc_breadcrumb) << 16); \
__name(handle); \
__margo_internal_decr_pending(__mid); \
if(__margo_internal_finalize_requested(__mid)) { \
......
/*
* (C) 2015 The University of Chicago
*
......@@ -14,6 +13,7 @@
#include <margo-config.h>
#include <time.h>
#include <math.h>
#include <endian.h>
#include "margo.h"
#include "margo-timer.h"
......@@ -23,14 +23,26 @@
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
/* Structure to store timing information */
struct diag_data
{
double min;
double max;
double cumulative;
int count;
/* the following fields are only used when this structure is used to track
* rpc timing information
*/
uint64_t rpc_breadcrumb; /* identifier for rpc and it's ancestors */
UT_hash_handle hh; /* hash table link */
};
/* key for Argobots thread-local storage to track RPC breadcrumbs across thread
* execution
*/
static ABT_key rpc_breadcrumb_key = ABT_KEY_NULL;
#define __DIAG_UPDATE(__data, __time)\
do {\
__data.count++; \
......@@ -55,6 +67,17 @@ struct margo_finalize_cb
struct margo_timer_list; /* defined in margo-timer.c */
/* Stores the name and rpc id of a registered RPC. We track this purely for
* debugging and instrumentation purposes
*/
struct margo_registered_rpc
{
hg_id_t id; /* rpc id */
uint64_t rpc_breadcrumb_fragment; /* fragment id used in rpc tracing */
char func_name[64]; /* string name of rpc */
struct margo_registered_rpc *next; /* pointer to next in list */
};
struct margo_instance
{
/* mercury/argobots state */
......@@ -74,6 +97,9 @@ struct margo_instance
int num_handler_pool_threads;
unsigned int hg_progress_timeout_ub;
/* list of rpcs registered on this instance for debugging and profiling purposes */
struct margo_registered_rpc *registered_rpcs;
/* control logic for callers waiting on margo to be finalized */
int finalize_flag;
int refcount;
......@@ -109,12 +135,16 @@ struct margo_instance
struct diag_data diag_progress_elapsed_zero_timeout;
struct diag_data diag_progress_elapsed_nonzero_timeout;
struct diag_data diag_progress_timeout_value;
struct diag_data *diag_rpc;
ABT_mutex diag_rpc_mutex;
};
struct margo_request_struct {
ABT_eventual eventual;
margo_timer_t* timer;
hg_handle_t handle;
double start_time; /* timestamp of when the operation started */
uint64_t rpc_breadcrumb; /* statistics tracking identifier, if applicable */
};
struct margo_rpc_data
......@@ -129,6 +159,8 @@ MERCURY_GEN_PROC(margo_shutdown_out_t, ((int32_t)(ret)))
static void hg_progress_fn(void* foo);
static void margo_rpc_data_free(void* ptr);
static uint64_t margo_breadcrumb_set(hg_id_t rpc_id);
static void margo_breadcrumb_measure(margo_instance_id mid, uint64_t rpc_breadcrumb, double start);
static void remote_shutdown_ult(hg_handle_t handle);
DECLARE_MARGO_RPC_HANDLER(remote_shutdown_ult);
......@@ -358,6 +390,11 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
hg_return_t hret;
struct margo_instance *mid;
/* set input offset to include breadcrumb information in Mercury requests */
hret = HG_Class_set_input_offset(HG_Context_get_class(hg_context), sizeof(uint64_t));
/* this should not ever fail */
assert(hret == HG_SUCCESS);
mid = calloc(1,sizeof(*mid));
if(!mid) goto err;
memset(mid, 0, sizeof(*mid));
......@@ -379,6 +416,8 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
ABT_mutex_create(&mid->pending_operations_mtx);
mid->finalize_requested = 0;
ABT_mutex_create(&mid->diag_rpc_mutex);
mid->timer_list = margo_timer_list_create();
if(mid->timer_list == NULL) goto err;
......@@ -386,6 +425,15 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
hret = margo_handle_cache_init(mid);
if(hret != HG_SUCCESS) goto err;
/* register thread local key to track RPC breadcrumbs across threads */
/* NOTE: we are registering a global key, even though init could be called
* multiple times for different margo instances. As of May 2019 this doesn't
* seem to be a problem to call ABT_key_create() multiple times.
*/
ret = ABT_key_create(free, &rpc_breadcrumb_key);
if(ret != 0)
goto err;
ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if(ret != 0) goto err;
......@@ -403,6 +451,7 @@ err:
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
ABT_mutex_free(&mid->diag_rpc_mutex);
free(mid);
}
return MARGO_INSTANCE_NULL;
......@@ -411,6 +460,7 @@ err:
static void margo_cleanup(margo_instance_id mid)
{
int i;
struct margo_registered_rpc *next_rpc;
/* call finalize callbacks */
struct margo_finalize_cb* fcb = mid->finalize_cb;
......@@ -426,6 +476,7 @@ static void margo_cleanup(margo_instance_id mid)
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
ABT_mutex_free(&mid->diag_rpc_mutex);
if (mid->owns_progress_pool)
{
......@@ -445,6 +496,11 @@ static void margo_cleanup(margo_instance_id mid)
margo_handle_cache_destroy(mid);
/* TODO: technically we could/should call ABT_key_free() for
* rpc_breadcrumb_key. We can't do that here, though, because the key is
* global, not local to this mid.
*/
if (mid->margo_init)
{
if (mid->hg_context)
......@@ -455,6 +511,13 @@ static void margo_cleanup(margo_instance_id mid)
ABT_finalize();
}
while(mid->registered_rpcs)
{
next_rpc = mid->registered_rpcs->next;
free(mid->registered_rpcs);
mid->registered_rpcs = next_rpc;
}
free(mid);
}
......@@ -586,14 +649,38 @@ hg_id_t margo_provider_register_name(margo_instance_id mid, const char *func_nam
{
hg_id_t id;
int ret;
struct margo_registered_rpc * tmp_rpc;
assert(provider_id <= MARGO_MAX_PROVIDER_ID);
id = gen_id(func_name, provider_id);
if(mid->diag_enabled)
{
/* track information about this rpc registration for debugging and
* profiling
*/
tmp_rpc = calloc(1, sizeof(*tmp_rpc));
if(!tmp_rpc)
return(0);
tmp_rpc->id = id;
tmp_rpc->rpc_breadcrumb_fragment = id >> (__MARGO_PROVIDER_ID_SIZE*8);
tmp_rpc->rpc_breadcrumb_fragment &= 0xffff;
strncpy(tmp_rpc->func_name, func_name, 63);
tmp_rpc->next = mid->registered_rpcs;
mid->registered_rpcs = tmp_rpc;
}
ret = margo_register_internal(mid, id, in_proc_cb, out_proc_cb, rpc_cb, pool);
if(ret == 0)
{
if(mid->diag_enabled)
{
mid->registered_rpcs = tmp_rpc->next;
free(tmp_rpc);
}
return(0);
}
return(id);
}
......@@ -794,6 +881,7 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
{
hg_return_t hret = info->ret;
margo_request req = (margo_request)(info->arg);
margo_instance_id mid;
if(hret == HG_CANCELED && req->timer) {
hret = HG_TIMEOUT;
......@@ -808,6 +896,16 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
free(req->timer);
}
if(req->rpc_breadcrumb != 0)
{
/* This is the callback from an HG_Forward call. Track RPC timing
* information.
*/
mid = margo_hg_handle_get_instance(req->handle);
assert(mid);
margo_breadcrumb_measure(mid, req->rpc_breadcrumb, req->start_time);
}
/* propagate return code out through eventual */
ABT_eventual_set(req->eventual, &hret, sizeof(hret));
......@@ -854,6 +952,7 @@ static hg_return_t margo_provider_iforward_internal(
hg_proc_cb_t in_cb, out_cb;
hg_bool_t flag;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
uint64_t *rpc_breadcrumb;
assert(provider_id <= MARGO_MAX_PROVIDER_ID);
......@@ -916,6 +1015,17 @@ static hg_return_t margo_provider_iforward_internal(
req, timeout_ms);
}
/* add rpc breadcrumb to outbound request; this will be used to track
* rpc statistics.
*/
ret = HG_Get_input_buf(handle, (void**)&rpc_breadcrumb, NULL);
if(ret != HG_SUCCESS)
return(ret);
req->rpc_breadcrumb = margo_breadcrumb_set(hgi->id);
/* LE encoding */
*rpc_breadcrumb = htole64(req->rpc_breadcrumb);
req->start_time = ABT_get_wtime();
return HG_Forward(handle, margo_cb, (void*)req, in_struct);
}
......@@ -996,6 +1106,8 @@ static hg_return_t margo_irespond_internal(
}
req->handle = handle;
req->timer = NULL;
req->start_time = ABT_get_wtime();
req->rpc_breadcrumb = 0;
return HG_Respond(handle, margo_cb, (void*)req, out_struct);
}
......@@ -1079,6 +1191,8 @@ static hg_return_t margo_bulk_itransfer_internal(
}
req->timer = NULL;
req->handle = HG_HANDLE_NULL;
req->start_time = ABT_get_wtime();
req->rpc_breadcrumb = 0;
hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
(void*)req, op, origin_addr, origin_handle, origin_offset, local_handle,
......@@ -1422,6 +1536,9 @@ void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
FILE *outfile;
time_t ltime;
char revised_file_name[256] = {0};
struct diag_data *dd, *tmp;
char rpc_breadcrumb_str[24] = {0};
struct margo_registered_rpc *tmp_rpc;
assert(mid->diag_enabled);
......@@ -1462,6 +1579,13 @@ void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
time(&ltime);
fprintf(outfile, "# Margo diagnostics\n");
fprintf(outfile, "# %s\n", ctime(&ltime));
fprintf(outfile, "# RPC breadcrumbs for RPCs that were registered on this process:\n");
tmp_rpc = mid->registered_rpcs;
while(tmp_rpc)
{
fprintf(outfile, "# 0x%4lx\t%s\n", tmp_rpc->rpc_breadcrumb_fragment, tmp_rpc->func_name);
tmp_rpc = tmp_rpc->next;
}
fprintf(outfile, "# <stat>\t<avg>\t<min>\t<max>\t<count>\n");
print_diag_data(outfile, "trigger_elapsed",
"Time consumed by HG_Trigger()",
......@@ -1475,6 +1599,28 @@ void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
print_diag_data(outfile, "progress_timeout_value",
"Timeout values passed to HG_Progress()",
&mid->diag_progress_timeout_value);
HASH_ITER(hh, mid->diag_rpc, dd, tmp)
{
int i;
uint64_t tmp_breadcrumb;
for(i=0; i<4; i++)
{
tmp_breadcrumb = dd->rpc_breadcrumb;
tmp_breadcrumb >>= (i*16);
tmp_breadcrumb &= 0xffff;
if(i==4)
sprintf(&rpc_breadcrumb_str[i*7], "0x%.4lx", tmp_breadcrumb);
else
sprintf(&rpc_breadcrumb_str[i*7], "0x%.4lx ", tmp_breadcrumb);
#if 0
if(i==0)
sprintf(&rpc_breadcrumb_str[(3-i)*5], "0x%4lx ", tmp_breadcrumb);
else
sprintf(&rpc_breadcrumb_str[(3-i)*5], "0x%4lx", tmp_breadcrumb);
#endif
}
print_diag_data(outfile, rpc_breadcrumb_str, "RPC statistics", dd);
}
if(outfile != stdout)
fclose(outfile);
......@@ -1700,3 +1846,110 @@ void __margo_internal_decr_pending(margo_instance_id mid)
mid->pending_operations -= 1;
ABT_mutex_unlock(mid->pending_operations_mtx);
}
/* sets the value of a breadcrumb, to be called just before issuing an RPC */
static uint64_t margo_breadcrumb_set(hg_id_t rpc_id)
{
uint64_t *val;
uint64_t tmp;
ABT_key_get(rpc_breadcrumb_key, (void**)(&val));
if(val == NULL)
{
/* key not set yet on this ULT; we need to allocate a new one
* with all zeroes for initial value of breadcrumb and idx
*/
/* NOTE: treating this as best effort; just return 0 if it fails */
val = calloc(1, sizeof(*val));
if(!val)
return(0);
}
/* NOTE: an rpc_id (after mux'ing) has provider in low order bits and
* base rpc_id in high order bits. After demuxing, a base_id has zeroed
* out low bits. So regardless of whether the rpc_id is a base_id or a
* mux'd id, either way we need to shift right to get either the
* provider id (or the space reserved for it) out of the way, then mask
* off 16 bits for use as a breadcrumb.
*/
tmp = rpc_id >> (__MARGO_PROVIDER_ID_SIZE*8);
tmp &= 0xffff;
/* clear low 16 bits of breadcrumb */
*val = (*val >> 16) << 16;
/* combine them, so that we have low order 16 of rpc id and high order
* bits of previous breadcrumb */
*val |= tmp;
ABT_key_set(rpc_breadcrumb_key, val);
return *val;
}
/* records statistics for a breadcrumb, to be used after completion of an
* RPC */
static void margo_breadcrumb_measure(margo_instance_id mid, uint64_t rpc_breadcrumb, double start)
{
struct diag_data *stat;
double end, elapsed;
if(!mid->diag_enabled)
return;
end = ABT_get_wtime();
elapsed = end-start;
ABT_mutex_lock(mid->diag_rpc_mutex);
HASH_FIND(hh, mid->diag_rpc, &rpc_breadcrumb,
sizeof(rpc_breadcrumb), stat);
if(!stat)
{
/* we aren't tracking this breadcrumb yet; add it */
stat = calloc(1, sizeof(*stat));
if(!stat)
{
/* best effort; we return gracefully without recording stats if this
* happens.
*/
ABT_mutex_unlock(mid->diag_rpc_mutex);
return;
}
stat->rpc_breadcrumb = rpc_breadcrumb;
HASH_ADD(hh, mid->diag_rpc, rpc_breadcrumb,
sizeof(rpc_breadcrumb), stat);
}
stat->count++;
stat->cumulative += elapsed;
if(elapsed > stat->max)
stat->max = elapsed;
if(stat->min == 0 || elapsed < stat->min)
stat->min = elapsed;
ABT_mutex_unlock(mid->diag_rpc_mutex);
return;
}
void __margo_internal_breadcrumb_handler_set(uint64_t rpc_breadcrumb)
{
uint64_t *val;
ABT_key_get(rpc_breadcrumb_key, (void**)(&val));
if(val == NULL)
{
/* key not set yet on this ULT; we need to allocate a new one */
/* best effort; just return and don't set it if we can't allocate memory */
val = malloc(sizeof(*val));
if(!val)
return;
}
*val = rpc_breadcrumb;
ABT_key_set(rpc_breadcrumb_key, val);
return;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment