Commit 313a5c92 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

done refactoring

parent 143099c4
......@@ -19,8 +19,7 @@ BUILT_SOURCES =
include_HEADERS = \
include/margo.h \
include/margo-logging.h \
include/margo-bulk-pool.h \
include/margo-diag.h
include/margo-bulk-pool.h
TESTS_ENVIRONMENT =
......
......@@ -19,46 +19,22 @@
extern "C" {
#endif
#define GET_SELF_ADDR_STR(__mid, __addr_str) do { \
hg_addr_t __self_addr; \
hg_size_t __size; \
__addr_str = NULL; \
if (margo_addr_self(__mid, &__self_addr) != HG_SUCCESS) break; \
if (margo_addr_to_string(__mid, NULL, &__size, __self_addr) != HG_SUCCESS) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if ((__addr_str = malloc(__size)) == NULL) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if (margo_addr_to_string(__mid, __addr_str, &__size, __self_addr) != HG_SUCCESS) { \
free(__addr_str); \
__addr_str = NULL; \
margo_addr_free(__mid, __self_addr); \
break; \
} \
margo_addr_free(__mid, __self_addr); \
} while(0)
/******************************************************************************************************/
/* used to identify a globally unique breadcrumb */
struct global_breadcrumb_key
struct margo_global_breadcrumb_key
{
uint64_t rpc_breadcrumb; /* a.k.a RPC callpath */
uint64_t addr_hash; /* hash of server addr */
uint16_t provider_id; /* provider_id within a server. NOT a globally unique identifier */
uint64_t rpc_breadcrumb; /* a.k.a RPC callpath */
uint64_t addr_hash; /* hash of server addr */
uint16_t provider_id; /* provider_id within a server. NOT a globally unique identifier */
};
enum breadcrumb_type
enum margo_breadcrumb_type
{
origin, target
origin, target
};
typedef enum breadcrumb_type breadcrumb_type;
typedef enum margo_breadcrumb_type margo_breadcrumb_type;
struct breadcrumb_stats
struct margo_breadcrumb_stats
{
/* stats for breadcrumb call times */
double min;
......@@ -79,17 +55,17 @@ struct breadcrumb_stats
unsigned long count;
};
typedef struct breadcrumb_stats breadcrumb_stats;
typedef struct margo_breadcrumb_stats margo_breadcrumb_stats;
/* structure to store breadcrumb snapshot, for consumption outside of margo.
reflects the margo-internal structure used to hold diagnostic data */
struct margo_breadcrumb
{
breadcrumb_stats stats;
margo_breadcrumb_stats stats;
/* 0 is this is a origin-side breadcrumb, 1 if this is a target-side breadcrumb */
breadcrumb_type type;
margo_breadcrumb_type type;
struct global_breadcrumb_key key;
struct margo_global_breadcrumb_key key;
struct margo_breadcrumb* next;
};
......@@ -97,7 +73,7 @@ struct margo_breadcrumb
/* snapshot contains linked list of breadcrumb data */
struct margo_breadcrumb_snapshot
{
struct margo_breadcrumb* ptr;
struct margo_breadcrumb* ptr;
};
......
......@@ -16,7 +16,6 @@ extern "C" {
#include <mercury_bulk.h>
#include <mercury_macros.h>
#include <abt.h>
#include <margo-diag.h>
#define DEPRECATED(msg) __attribute__((deprecated(msg)))
......@@ -164,6 +163,30 @@ margo_instance_id margo_init(
int use_progress_thread,
int rpc_thread_count);
/**
* Initializes a margo instance using a margo_init_info struct to provide arguments.
*
* @param args Arguments
* @param address Address or protocol
* @param mode MARGO_CLIENT_MODE or MARGO_SERVER_MODE
*
* @return a margo_instance_id or MARGO_INSTANCE_NULL in case of failure.
*
* NOTE: if you are configuring Argobots pools yourself before
* passing them into this function, please consider setting
* ABT_MEM_MAX_NUM_STACKS to a low value (like 8) either in your
* environment or programmatically with putenv() in your code before
* creating the pools to prevent excess memory consumption under
* load from producer/consumer patterns across execution streams that
* fail to utilize per-execution stream stack caches. See
* https://xgitlab.cels.anl.gov/sds/margo/issues/40 for details.
* The margo_init() function does this automatically.
*/
margo_instance_id margo_init_ext(
const char* address,
int mode,
const struct margo_init_info* args);
/**
* Initializes margo library with custom Mercury options.
* @param [in] addr_str Mercury host address with port number
......@@ -197,11 +220,6 @@ margo_instance_id margo_init_opt(
int use_progress_thread,
int rpc_thread_count) DEPRECATED("use margo_init_ext instead");
/* same as above, but with configuration expressed via json */
margo_instance_id margo_init_opt_json(
const struct hg_init_info *hg_init_info,
const char* json_cfg_string);
/**
* Initializes margo library from given argobots and Mercury instances.
* @param [in] progress_pool Argobots pool to drive communication
......@@ -224,40 +242,6 @@ margo_instance_id margo_init_pool(
ABT_pool handler_pool,
hg_context_t *hg_context) DEPRECATED("use margo_init_ext instead");
/**
* same as margo_init_pool() except that it has an additional argument to
* accept a json configuration string
*/
margo_instance_id margo_init_pool_json(
ABT_pool progress_pool,
ABT_pool handler_pool,
hg_context_t *hg_context,
const char* json_cfg_string);
/**
* Initializes a margo instance using a margo_init_info struct to provide arguments.
*
* @param args Arguments
* @param address Address or protocol
* @param mode MARGO_CLIENT_MODE or MARGO_SERVER_MODE
*
* @return a margo_instance_id or MARGO_INSTANCE_NULL in case of failure.
*
* NOTE: if you are configuring Argobots pools yourself before
* passing them into this function, please consider setting
* ABT_MEM_MAX_NUM_STACKS to a low value (like 8) either in your
* environment or programmatically with putenv() in your code before
* creating the pools to prevent excess memory consumption under
* load from producer/consumer patterns across execution streams that
* fail to utilize per-execution stream stack caches. See
* https://xgitlab.cels.anl.gov/sds/margo/issues/40 for details.
* The margo_init() function does this automatically.
*/
margo_instance_id margo_init_ext(
const char* address,
int mode,
const struct margo_init_info* args);
/**
* Shuts down margo library and its underlying abt and mercury resources
* @param [in] mid Margo instance
......
src_libmargo_la_SOURCES += \
src/margo.c \
src/margo-core.c \
src/margo-diag.c \
src/margo-bulk-pool.c \
src/margo-globals.c \
src/margo-handle-cache.c \
......
This diff is collapsed.
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MARGO_DIAG_INTERNAL_H
#define __MARGO_DIAG_INTERNAL_H
#include <stdio.h> /* defines printf for tests */
#include <time.h> /* defines time_t for timings in the test */
#include <stdint.h> /* defines uint32_t etc */
#include <sys/param.h> /* attempt to define endianness */
#ifdef linux
# include <endian.h> /* attempt to define endianness */
#endif
#include "margo-instance.h"
void __margo_sparkline_data_collection_fn(void* foo);
void __margo_print_diag_data(
margo_instance_id mid,
FILE *file,
const char* name,
const char *description,
struct diag_data *data);
void __margo_print_profile_data(
margo_instance_id mid,
FILE *file,
const char* name,
const char *description,
struct diag_data *data);
#define GET_SELF_ADDR_STR(__mid, __addr_str) do { \
hg_addr_t __self_addr; \
hg_size_t __size; \
__addr_str = NULL; \
if (margo_addr_self(__mid, &__self_addr) != HG_SUCCESS) break; \
if (margo_addr_to_string(__mid, NULL, &__size, __self_addr) != HG_SUCCESS) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if ((__addr_str = malloc(__size)) == NULL) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if (margo_addr_to_string(__mid, __addr_str, &__size, __self_addr) != HG_SUCCESS) { \
free(__addr_str); \
__addr_str = NULL; \
margo_addr_free(__mid, __self_addr); \
break; \
} \
margo_addr_free(__mid, __self_addr); \
} while(0)
#define __DIAG_UPDATE(__data, __time)\
do {\
__data.stats.count++; \
__data.stats.cumulative += (__time); \
if((__time) > __data.stats.max) __data.stats.max = (__time); \
if(__data.stats.min == 0 || (__time) < __data.stats.min) __data.stats.min = (__time); \
} while(0)
#endif
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <margo.h>
#include "margo-diag-internal.h"
#include "margo-instance.h"
void __margo_sparkline_data_collection_fn(void* foo)
{
struct margo_instance *mid = (struct margo_instance *)foo;
struct diag_data *stat, *tmp;
/* double check that profile collection should run, else, close this ULT */
if(!mid->profile_enabled) {
ABT_thread_join(mid->sparkline_data_collection_tid);
ABT_thread_free(&mid->sparkline_data_collection_tid);
}
int sleep_time_msec = json_integer_value(json_object_get(mid->json_cfg, "profile_sparkline_timeslice_msec"));
while(!mid->hg_progress_shutdown_flag)
{
margo_thread_sleep(mid, sleep_time_msec);
HASH_ITER(hh, mid->diag_rpc, stat, tmp)
{
if(mid->sparkline_index > 0 && mid->sparkline_index < 100) {
stat->sparkline_time[mid->sparkline_index] = stat->stats.cumulative - stat->sparkline_time[mid->sparkline_index - 1];
stat->sparkline_count[mid->sparkline_index] = stat->stats.count - stat->sparkline_count[mid->sparkline_index - 1];
} else if(mid->sparkline_index == 0) {
stat->sparkline_time[mid->sparkline_index] = stat->stats.cumulative;
stat->sparkline_count[mid->sparkline_index] = stat->stats.count;
} else {
//Drop!
}
}
mid->sparkline_index++;
mid->previous_sparkline_data_collection_time = ABT_get_wtime();
}
return;
}
void __margo_print_diag_data(
margo_instance_id mid,
FILE *file,
const char* name,
const char *description,
struct diag_data *data)
{
double avg;
if(data->stats.count != 0)
avg = data->stats.cumulative/data->stats.count;
else
avg = 0;
fprintf(file, "%s,%.9f,%.9f,%.9f,%.9f,%lu\n",
name,
avg,
data->stats.cumulative,
data->stats.min,
data->stats.max,
data->stats.count);
return;
}
void __margo_print_profile_data(
margo_instance_id mid,
FILE *file,
const char* name,
const char *description,
struct diag_data *data)
{
double avg;
int i;
if(data->stats.count != 0)
avg = data->stats.cumulative/data->stats.count;
else
avg = 0;
/* first line is breadcrumb data */
fprintf(file, "%s,%.9f,%lu,%lu,%d,%.9f,%.9f,%.9f,%lu,%lu,%lu,%lu,%lu,%lu,%lu\n",
name,
avg,
data->key.rpc_breadcrumb,
data->key.addr_hash,
data->type,
data->stats.cumulative,
data->stats.min,
data->stats.max,
data->stats.count,
data->stats.abt_pool_size_hwm,
data->stats.abt_pool_size_lwm,
data->stats.abt_pool_size_cumulative,
data->stats.abt_pool_total_size_hwm,
data->stats.abt_pool_total_size_lwm,
data->stats.abt_pool_total_size_cumulative);
/* second line is sparkline data for the given breadcrumb*/
fprintf(file, "%s,%d;", name, data->type);
for(i = 0; i < mid->sparkline_index; i++)
fprintf(file, "%.9f,%.9f, %d;", data->sparkline_time[i], data->sparkline_count[i], i);
fprintf(file,"\n");
return;
}
/* copy out the entire list of breadcrumbs on this margo instance */
void margo_breadcrumb_snapshot(margo_instance_id mid, struct margo_breadcrumb_snapshot* snap)
{
assert(mid->profile_enabled);
struct diag_data *dd, *tmp;
struct margo_breadcrumb *tmp_bc;
#if 0
fprintf(stderr, "Taking a snapshot\n");
#endif
snap->ptr = calloc(1, sizeof(struct margo_breadcrumb));
tmp_bc = snap->ptr;
HASH_ITER(hh, mid->diag_rpc, dd, tmp)
{
#if 0
fprintf(stderr, "Copying out RPC breadcrumb %d\n", dd->rpc_breadcrumb);
#endif
tmp_bc->stats.min = dd->stats.min;
tmp_bc->stats.max = dd->stats.max;
tmp_bc->type = dd->type;
tmp_bc->key = dd->key;
tmp_bc->stats.count = dd->stats.count;
tmp_bc->stats.cumulative = dd->stats.cumulative;
tmp_bc->stats.abt_pool_total_size_hwm = dd->stats.abt_pool_total_size_hwm;
tmp_bc->stats.abt_pool_total_size_lwm = dd->stats.abt_pool_total_size_lwm;
tmp_bc->stats.abt_pool_total_size_cumulative = dd->stats.abt_pool_total_size_cumulative;
tmp_bc->stats.abt_pool_size_hwm = dd->stats.abt_pool_size_hwm;
tmp_bc->stats.abt_pool_size_lwm = dd->stats.abt_pool_size_lwm;
tmp_bc->stats.abt_pool_size_cumulative = dd->stats.abt_pool_size_cumulative;
tmp_bc->next = calloc(1, sizeof(struct margo_breadcrumb));
tmp_bc = tmp_bc->next;
tmp_bc->next = NULL;
}
}
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify)
{
FILE *outfile;
time_t ltime;
char revised_file_name[256] = {0};
char * name;
uint64_t hash;
if (!mid->diag_enabled)
return;
if(uniquify)
{
char hostname[128] = {0};
int pid;
gethostname(hostname, 128);
pid = getpid();
sprintf(revised_file_name, "%s-%s-%d.diag", file, hostname, pid);
}
else
{
sprintf(revised_file_name, "%s.diag", file);
}
if(strcmp("-", file) == 0)
{
outfile = stdout;
}
else
{
outfile = fopen(revised_file_name, "a");
if(!outfile)
{
perror("fopen");
return;
}
}
/* TODO: support pattern substitution in file name to create unique
* output files per process
*/
time(&ltime);
fprintf(outfile, "# Margo diagnostics\n");
GET_SELF_ADDR_STR(mid, name);
HASH_JEN(name, strlen(name), hash); /*record own address in the breadcrumb */
fprintf(outfile, "#Addr Hash and Address Name: %lu,%s\n", hash, name);
fprintf(outfile, "# %s\n", ctime(&ltime));
fprintf(outfile, "# Function Name, Average Time Per Call, Cumulative Time, Highwatermark, Lowwatermark, Call Count\n");
__margo_print_diag_data(mid, outfile, "trigger_elapsed",
"Time consumed by HG_Trigger()",
&mid->diag_trigger_elapsed);
__margo_print_diag_data(mid, outfile, "progress_elapsed_zero_timeout",
"Time consumed by HG_Progress() when called with timeout==0",
&mid->diag_progress_elapsed_zero_timeout);
__margo_print_diag_data(mid, outfile, "progress_elapsed_nonzero_timeout",
"Time consumed by HG_Progress() when called with timeout!=0",
&mid->diag_progress_elapsed_nonzero_timeout);
__margo_print_diag_data(mid, outfile, "bulk_create_elapsed",
"Time consumed by HG_Bulk_create()",
&mid->diag_bulk_create_elapsed);
if(outfile != stdout)
fclose(outfile);
return;
}
void margo_profile_dump(margo_instance_id mid, const char* file, int uniquify)
{
FILE *outfile;
time_t ltime;
char revised_file_name[256] = {0};
struct diag_data *dd, *tmp;
char rpc_breadcrumb_str[256] = {0};
struct margo_registered_rpc *tmp_rpc;
char * name;
uint64_t hash;
assert(mid->profile_enabled);
if(uniquify)
{
char hostname[128] = {0};
int pid;
gethostname(hostname, 128);
pid = getpid();
sprintf(revised_file_name, "%s-%s-%d.csv", file, hostname, pid);
}
else
{
sprintf(revised_file_name, "%s.csv", file);
}
if(strcmp("-", file) == 0)
{
outfile = stdout;
}
else
{
outfile = fopen(revised_file_name, "a");
if(!outfile)
{
perror("fopen");
return;
}
}
/* TODO: support pattern substitution in file name to create unique
* output files per process
*/
time(&ltime);
fprintf(outfile, "%u\n", mid->num_registered_rpcs);
GET_SELF_ADDR_STR(mid, name);
HASH_JEN(name, strlen(name), hash); /*record own address in the breadcrumb */
fprintf(outfile, "%lu,%s\n", hash, name);
tmp_rpc = mid->registered_rpcs;
while(tmp_rpc)
{
fprintf(outfile, "0x%.4lx,%s\n", tmp_rpc->rpc_breadcrumb_fragment, tmp_rpc->func_name);
tmp_rpc = tmp_rpc->next;
}
HASH_ITER(hh, mid->diag_rpc, dd, tmp)
{
int i;
uint64_t tmp_breadcrumb;
for(i=0; i<4; i++)
{
tmp_breadcrumb = dd->rpc_breadcrumb;
tmp_breadcrumb >>= (i*16);
tmp_breadcrumb &= 0xffff;
if(!tmp_breadcrumb) continue;
if(i==3)
sprintf(&rpc_breadcrumb_str[i*7], "0x%.4lx", tmp_breadcrumb);
else
sprintf(&rpc_breadcrumb_str[i*7], "0x%.4lx ", tmp_breadcrumb);
}
__margo_print_profile_data(mid, outfile, rpc_breadcrumb_str, "RPC statistics", dd);
}
if(outfile != stdout)
fclose(outfile);
return;
}
......@@ -9,6 +9,10 @@
#include <margo-logging.h>
#include <jansson.h>
#include "margo-instance.h"
#include "margo-progress.h"
#include "margo-timer.h"
#include "margo-diag-internal.h"
#include "margo-handle-cache.h"
#include "margo-globals.h"
#include "margo-macros.h"
......@@ -49,6 +53,10 @@ static int create_xstream_from_config(
static void set_argobots_environment_variables(
json_t* config);
// Shutdown logic for a margo instance
static void remote_shutdown_ult(hg_handle_t handle);
static DECLARE_MARGO_RPC_HANDLER(remote_shutdown_ult);
margo_instance_id margo_init_ext(
const char* address,
......@@ -64,17 +72,18 @@ margo_instance_id margo_init_ext(
hg_return_t hret;
margo_instance_id mid = MARGO_INSTANCE_NULL;
hg_class_t* hg_class = NULL;
hg_context_t* hg_context = NULL;
struct hg_init_info hg_init_info = { 0 };
hg_addr_t self_addr = HG_ADDR_NULL;
ABT_pool* pools = NULL;
size_t num_pools = 0;
ABT_xstream* xstreams = NULL;
bool* owns_xstream = NULL;
size_t num_xstreams = 0;
ABT_pool progress_pool = ABT_POOL_NULL;
ABT_pool rpc_pool = ABT_POOL_NULL;
hg_class_t* hg_class = NULL;
hg_context_t* hg_context = NULL;
uint8_t hg_ownership = 0;
struct hg_init_info hg_init_info = { 0 };
hg_addr_t self_addr = HG_ADDR_NULL;
ABT_pool* pools = NULL;
size_t num_pools = 0;
ABT_xstream* xstreams = NULL;
bool* owns_xstream = NULL;
size_t num_xstreams = 0;
ABT_pool progress_pool = ABT_POOL_NULL;
ABT_pool rpc_pool = ABT_POOL_NULL;
if(args.json_config) {
// read JSON config from provided string argument
......@@ -120,6 +129,7 @@ margo_instance_id margo_init_ext(
MARGO_ERROR(0, "Could not initialize hg_class");
goto error;
}
hg_ownership |= MARGO_OWNS_HG_CLASS;
}
// handle hg_context
......@@ -133,6 +143,7 @@ margo_instance_id margo_init_ext(
MARGO_ERROR(0, "Could not initialize hg_context");
goto error;