Commit 143099c4 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

continuing the refactoring

parent 0f115844
......@@ -19,6 +19,8 @@ extern "C" {
#include <margo-diag.h>
#define DEPRECATED(msg) __attribute__((deprecated(msg)))
/* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)
......@@ -43,6 +45,95 @@ typedef void(*margo_finalize_callback_t)(void*);
#define MARGO_DEFAULT_PROVIDER_ID 0
#define MARGO_MAX_PROVIDER_ID ((1 << (8*__MARGO_PROVIDER_ID_SIZE))-1)
/**
* The margo_init_info structure should be passed to margo_init_ext
* to finely configure Margo. The structure can be memset to 0 to have
* margo use default values (no progress thread, no rpc thread, default
* initialization of mercury, etc.). For any field that is not NULL,
* margo_init_ext will first look for a configuration in the json_config
* string. If no configuration is found or of json_config is NULL, margo
* will fall back to default.
*/
struct margo_init_info {
const char* json_config; /* JSON-formatted string */
ABT_pool progress_pool; /* Progress pool */
ABT_pool rpc_pool; /* RPC handler pool */
hg_class_t* hg_class; /* Mercury class */
hg_context_t* hg_context; /* Mercury context */
struct hg_init_info* hg_init_info; /* Mercury init info */
};
/**
* Example JSON configuration:
* ------------------------------------------------
{
"mercury" : {
"address" : "na+sm://",
"listening" : false,
"auto_sm" : true,
"version" : "2.0.0",
"stats" : false,
"na_no_block" : false,
"na_no_retry" : false,
"max_contexts" : 1,
"ip_subnet" : "",
"auth_key" : ""
},
"argobots" : {
"abt_mem_max_num_stacks" : 8,
"abt_thread_stacksize" : 2097152,
"version" : "1.0.0",
"pools" : [
{
"name" : "my_progress_pool",
"kind" : "fifo_wait",
"access" : "mpmc"
},
{
"name" : "my_rpc_pool",
"kind" : "fifo_wait",
"access" : "mpmc"
}
],
"xstreams" : [
{
"name" : "my_progress_xstream",
"cpubind" : 0,
"affinity" : [ 0, 1 ],
"scheduler" : {
"type" : "basic_wait",
"pools" : [ "my_progress_pool" ]
}
},
{
"name" : "my_rpc_xstream_0",
"cpubind" : 2,
"affinity" : [ 2, 3, 4, 5 ],
"scheduler" : {
"type" : "basic_wait",
"pools" : [ "my_rpc_pool" ]
}
},
{
"name" : "my_rpc_xstream_1",
"cpubind" : 6,
"affinity" : [ 6, 7 ],
"scheduler" : {
"type" : "basic_wait",
"pools" : [ "my_rpc_pool" ]
}
}
]
},
"handle_cache_size" : 32,
"profile_sparkline_timeslice_msec" : 1000,
"progress_timeout_ub_msec" : 100,
"enable_profiling" : false,
"enable_diagnostics" : false
}
* ------------------------------------------------
*/
/**
* Initializes margo library.
* @param [in] addr_str Mercury host address with port number
......@@ -67,11 +158,11 @@ typedef void(*margo_finalize_callback_t)(void*);
* call margo_wait_for_finalize() after margo_init() to relinguish control to
* Margo.
*/
#define margo_init(_addr_str, _mode, _use_progress_thread, _rpc_thread_count)\
margo_init_opt(_addr_str, _mode, NULL, _use_progress_thread, _rpc_thread_count)
#define margo_init_json(_json_cfg_string)\
margo_init_opt_json(NULL, _json_cfg_string)
margo_instance_id margo_init(
const char *addr_str,
int mode,
int use_progress_thread,
int rpc_thread_count);
/**
* Initializes margo library with custom Mercury options.
......@@ -104,7 +195,7 @@ margo_instance_id margo_init_opt(
int mode,
const struct hg_init_info *hg_init_info,
int use_progress_thread,
int rpc_thread_count);
int rpc_thread_count) DEPRECATED("use margo_init_ext instead");
/* same as above, but with configuration expressed via json */
margo_instance_id margo_init_opt_json(
......@@ -131,7 +222,7 @@ margo_instance_id margo_init_opt_json(
margo_instance_id margo_init_pool(
ABT_pool progress_pool,
ABT_pool handler_pool,
hg_context_t *hg_context);
hg_context_t *hg_context) DEPRECATED("use margo_init_ext instead");
/**
* same as margo_init_pool() except that it has an additional argument to
......@@ -143,6 +234,30 @@ margo_instance_id margo_init_pool_json(
hg_context_t *hg_context,
const char* json_cfg_string);
/**
* Initializes a margo instance using a margo_init_info struct to provide arguments.
*
* @param args Arguments
* @param address Address or protocol
* @param mode MARGO_CLIENT_MODE or MARGO_SERVER_MODE
*
* @return a margo_instance_id or MARGO_INSTANCE_NULL in case of failure.
*
* NOTE: if you are configuring Argobots pools yourself before
* passing them into this function, please consider setting
* ABT_MEM_MAX_NUM_STACKS to a low value (like 8) either in your
* environment or programmatically with putenv() in your code before
* creating the pools to prevent excess memory consumption under
* load from producer/consumer patterns across execution streams that
* fail to utilize per-execution stream stack caches. See
* https://xgitlab.cels.anl.gov/sds/margo/issues/40 for details.
* The margo_init() function does this automatically.
*/
margo_instance_id margo_init_ext(
const char* address,
int mode,
const struct margo_init_info* args);
/**
* Shuts down margo library and its underlying abt and mercury resources
* @param [in] mid Margo instance
......
spack:
specs:
- mochi-cfg
- autoconf
- m4
- automake
- libtool
- pkg-config
- argobots
- mercury
concretization: together
src_libmargo_la_SOURCES += \
src/margo.c \
src/margo-bulk-pool.c \
src/margo-globals.c \
src/margo-handle-cache.c \
src/margo-init.c \
src/margo-logging.c \
src/margo-timer.h \
src/margo-timer.c \
src/margo-bulk-pool.c
src/margo-timer.c
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "margo-globals.h"
#include "margo-logging.h"
int g_margo_num_instances = 0;
ABT_mutex g_margo_num_instances_mtx = ABT_MUTEX_NULL;
bool g_margo_abt_init = 0;
ABT_key g_margo_rpc_breadcrumb_key = ABT_KEY_NULL;
ABT_key g_margo_target_timing_key = ABT_KEY_NULL;
margo_log_level g_margo_log_level = MARGO_LOG_ERROR;
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MARGO_GLOBALS_H
#define __MARGO_GLOBALS_H
// This file contains all the global variables that must be shared across
// compilation units. These globals must start with the g_margo_ prefix.
#include <stdbool.h>
#include <margo.h>
// If margo is initializing ABT, we need to track how many instances of margo
// are being created, so that the last one can call ABT_finalize.
// If margo initializes ABT, g_margo_num_instances_mtx will be created, so
// in later calls and in margo_cleanup we can check for
// g_margo_num_instances_mtx != ABT_MUTEX_NULL
// to know if we should do something to cleanup ABT as well.
extern int g_margo_num_instances;
extern ABT_mutex g_margo_num_instances_mtx;
extern bool g_margo_abt_init;
// Keys for Argobots thread-local storage to track RPC breadcrumbs
// across thread execution.
extern ABT_key g_margo_rpc_breadcrumb_key;
extern ABT_key g_margo_target_timing_key;
#endif
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "margo-instance.h"
#include "margo-handle-cache.h"
struct margo_handle_cache_el
{
hg_handle_t handle;
UT_hash_handle hh; /* in-use hash link */
struct margo_handle_cache_el *next; /* free list link */
};
hg_return_t __margo_handle_cache_init(margo_instance_id mid, size_t handle_cache_size)
{
int i;
struct margo_handle_cache_el *el;
hg_return_t hret = HG_SUCCESS;
ABT_mutex_create(&(mid->handle_cache_mtx));
for(i = 0; i < handle_cache_size; i++)
{
el = malloc(sizeof(*el));
if(!el)
{
hret = HG_NOMEM_ERROR;
__margo_handle_cache_destroy(mid);
break;
}
/* create handle with NULL_ADDRs, we will reset later to valid addrs */
hret = HG_Create(mid->hg_context, HG_ADDR_NULL, 0, &el->handle);
if(hret != HG_SUCCESS)
{
free(el);
__margo_handle_cache_destroy(mid);
break;
}
/* add to the free list */
LL_PREPEND(mid->free_handle_list, el);
}
return hret;
}
void __margo_handle_cache_destroy(margo_instance_id mid)
{
struct margo_handle_cache_el *el, *tmp;
/* only free handle list elements -- handles in hash are still in use */
LL_FOREACH_SAFE(mid->free_handle_list, el, tmp)
{
LL_DELETE(mid->free_handle_list, el);
HG_Destroy(el->handle);
free(el);
}
ABT_mutex_free(&mid->handle_cache_mtx);
return;
}
hg_return_t __margo_handle_cache_get(
margo_instance_id mid,
hg_addr_t addr,
hg_id_t id,
hg_handle_t *handle)
{
struct margo_handle_cache_el *el;
hg_return_t hret = HG_SUCCESS;
ABT_mutex_lock(mid->handle_cache_mtx);
if(!mid->free_handle_list)
{
/* if no available handles, just fall through */
hret = HG_OTHER_ERROR;
goto finish;
}
/* pop first element from the free handle list */
el = mid->free_handle_list;
LL_DELETE(mid->free_handle_list, el);
/* reset handle */
hret = HG_Reset(el->handle, addr, id);
if(hret == HG_SUCCESS)
{
/* put on in-use list and pass back handle */
HASH_ADD(hh, mid->used_handle_hash, handle, sizeof(hg_handle_t), el);
*handle = el->handle;
}
else
{
/* reset failed, add handle back to the free list */
LL_APPEND(mid->free_handle_list, el);
}
finish:
ABT_mutex_unlock(mid->handle_cache_mtx);
return hret;
}
hg_return_t __margo_handle_cache_put(
margo_instance_id mid,
hg_handle_t handle)
{
struct margo_handle_cache_el *el;
hg_return_t hret = HG_SUCCESS;
ABT_mutex_lock(mid->handle_cache_mtx);
/* look for handle in the in-use hash */
HASH_FIND(hh, mid->used_handle_hash, &handle, sizeof(hg_handle_t), el);
if(!el)
{
/* this handle was manually allocated -- just fall through */
hret = HG_OTHER_ERROR;
goto finish;
}
/* remove from the in-use hash */
HASH_DELETE(hh, mid->used_handle_hash, el);
/* add to the tail of the free handle list */
LL_APPEND(mid->free_handle_list, el);
finish:
ABT_mutex_unlock(mid->handle_cache_mtx);
return hret;
}
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MARGO_HANDLE_CACHE_H
#define __MARGO_HANDLE_CACHE_H
#include <margo.h>
// private functions that initialize the handle cache for a margo instance,
// and access cached handles.
hg_return_t __margo_handle_cache_init(
margo_instance_id mid,
size_t handle_cache_size);
void __margo_handle_cache_destroy(margo_instance_id mid);
hg_return_t __margo_handle_cache_get(
margo_instance_id mid,
hg_addr_t addr,
hg_id_t id,
hg_handle_t *handle);
hg_return_t __margo_handle_cache_put(
margo_instance_id mid,
hg_handle_t handle);
#endif
This diff is collapsed.
......@@ -45,12 +45,7 @@ struct diag_data
UT_hash_handle hh; /* hash table link */
};
struct margo_handle_cache_el
{
hg_handle_t handle;
UT_hash_handle hh; /* in-use hash link */
struct margo_handle_cache_el *next; /* free list link */
};
struct margo_handle_cache_el; /* defined in margo-handle-cache.c */
struct margo_finalize_cb
{
......@@ -91,7 +86,7 @@ struct margo_instance
ABT_xstream *rpc_xstreams;
int num_handler_pool_threads;
int hg_progress_timeout_ub;
uint16_t num_registered_rpcs; /* number of registered rpc's by all providers on this instance */
uint16_t num_registered_rpcs; /* number of registered rpc's by all providers on this instance */
/* list of rpcs registered on this instance for debugging and profiling purposes */
struct margo_registered_rpc *registered_rpcs;
......@@ -140,7 +135,7 @@ struct margo_instance
struct diag_data *diag_rpc;
ABT_mutex diag_rpc_mutex;
json_t *component_cfg;
json_t *json_cfg;
/* logging */
struct margo_logger logger;
......
......@@ -3,7 +3,7 @@
*
* See COPYRIGHT in top-level directory.
*/
#include "margo-internal.h"
#include "margo-instance.h"
#include <margo-logging.h>
static margo_log_level global_log_level = MARGO_LOG_ERROR;
......
......@@ -14,8 +14,8 @@
// After a call to this macro, __out is set to the ceated/found field.
#define CONFIG_HAS_OR_CREATE_OBJECT(__config, __key, __fullname, __out) do { \
__out = json_object_get(__config, __key); \
if(__out && json_is_object(__out)) { \
fprintf(stderr, "ERROR: %s is in configuration but is not an object type\n", __fullname); \
if(__out && !json_is_object(__out)) { \
MARGO_ERROR(0, "\"%s\" is in configuration but is not an object", __fullname); \
return -1; \
} \
if(!__out) { \
......@@ -30,8 +30,8 @@
// After a call to this macro, __out is set to the ceated/found field.
#define CONFIG_HAS_OR_CREATE_ARRAY(__config, __key, __fullname, __out) do { \
__out = json_object_get(__config, __key); \
if(__out && json_is_array(__out)) { \
fprintf(stderr, "ERROR: %s is in configuration but is not an array type\n", __fullname); \
if(__out && !json_is_array(__out)) { \
MARGO_ERROR(0, "\"%s\" is in configuration but is not an array", __fullname); \
return -1; \
} \
if(!__out) { \
......@@ -47,8 +47,8 @@
// After a call to this macro, __out is set to the ceated/found field.
#define CONFIG_HAS_OR_CREATE(__config, __type, __key, __value, __fullname, __out) do { \
__out = json_object_get(__config, __key); \
if(__out && json_is_##__type(__out)) { \
fprintf(stderr, "ERROR: %s in configuration but has an incorrect type\n", __fullname); \
if(__out && !json_is_##__type(__out)) { \
MARGO_ERROR(0, "\"%s\" in configuration but has an incorrect type (expected %s)", __fullname, #__type); \
return -1; \
} \
if(!__out) { \
......@@ -63,11 +63,11 @@
#define CONFIG_MUST_HAVE(__config, __type, __key, __fullname, __out) do { \
__out = json_object_get(__config, __key); \
if(!__out) { \
fprintf(stderr, "ERROR: %s not found in configuration\n", __fullname); \
MARGO_ERROR(0, "\"%s\" not found in configuration", __fullname); \
return -1; \
} \
if(!json_is_##__type(__out)) { \
fprintf(stderr, "ERROR: %s in configuration has incorrect type (should be %s)\n", __fullname, #__type); \
MARGO_ERROR(0, "\"%s\" in configuration has incorrect type (expected %s)", __fullname, #__type); \
return -1; \
} \
} while(0)
......@@ -78,10 +78,10 @@
json_t* _tmp = json_object_get(__config, __key); \
if(_tmp && __warning) { \
if(!json_is_string(_tmp)) \
fprintf(stderr, "WARNING: overriding field %s with value %s\n", \
MARGO_WARNING(0, "Overriding field \"%s\" with value \"%s\"", \
__fullname, __value); \
else if(strcmp(json_string_value(_tmp), __value) != 0) \
fprintf(stderr, "WARNING: overriding field %s (%s) with value %s\n", \
MARGO_WARNING(0, "Overriding field \"%s\" (\"%s\") with value \"%s\"", \
__fullname, json_string_value(_tmp), __value); \
} \
_tmp = json_string(__value); \
......@@ -94,10 +94,10 @@
json_t* _tmp = json_object_get(__config, __key); \
if(_tmp && __warning) { \
if(!json_is_boolean(_tmp)) \
fprintf(stderr, "WARNING: overriding field %s with value %s\n", \
MARGO_WARNING(0, "Overriding field \"%s\" with value \"%s\"", \
__field_name, __value ? "true" : "false"); \
else if(json_boolean_value(_tmp) != !!__value) \
fprintf(stderr, "WARNING: overriding field %s (%s) with value %s\n", \
MARGO_WARNING(0, "Overriding field \"%s\" (\"%s\") with value \"%s\"", \
__field_name, json_boolean_value(_tmp) ? "true" : "false", \
__value ? "true" : "false"); \
} \
......@@ -110,10 +110,10 @@
json_t* _tmp = json_object_get(__config, __key); \
if(_tmp && __warning) { \
if(!json_is_integer(_tmp)) \
fprintf(stderr, "WARNING: overriding field %s with value %d\n", \
MARGO_WARNING(0, "Overriding field \"%s\" with value %d", \
__field_name, (int)__value); \
else if(json_integer_value(_tmp) != __value) \
fprintf(stderr, "WARNING: overriding field %s (%d) with value %d\n", \
MARGO_WARNING(0, "Overriding field \"%s\" (%d) with value %d", \
__field_name, json_integer_value(_tmp), __value); \
} \
json_object_set_new(__config, __key, json_integer(__value)); \
......@@ -123,7 +123,7 @@
#define CONFIG_INTEGER_MUST_BE_POSITIVE(__config, __key, __fullname) do { \
int _tmp = json_integer_value(json_object_get(__config, __key)); \
if(_tmp < 0) { \
fprintf(stderr, "ERROR: %s must not be negative\n", __fullname); \
MARGO_ERROR(0, "\"%s\" must not be negative", __fullname); \
return -1; \
} \
} while(0)
......@@ -167,7 +167,7 @@
} \
} \
if(!_found) { \
fprintf(stderr, "ERROR: could not find element named \"%s\" in %s array\n", \
MARGO_ERROR(0, "Could not find element named \"%s\" in \"%s\" array", \
__name, __array_name); \
return -1; \
} \
......@@ -178,9 +178,9 @@
#define CONFIG_IS_IN_ENUM_STRING(__config, __field_name, ...) do { \
unsigned _i = 0; \
const char* _vals[] = { __VA_ARGS__, NULL }; \
while(_vals[i] && strcmp(_vals[i], json_string_value(__config))) _i++; \
if(!_vals[i]) { \
fprintf(stderr, "ERROR: invalid enum value for %s\n (%s)", __field_name, json_string_value(__config)); \
while(_vals[_i] && strcmp(_vals[_i], json_string_value(__config))) _i++; \
if(!_vals[_i]) { \
MARGO_ERROR(0, "Invalid enum value for \"%s\" (\"%s\")", __field_name, json_string_value(__config)); \
return -1; \
} \
} while(0)
......@@ -196,7 +196,7 @@
json_t* _a_name = json_object_get(_a, "name"); \
json_t* _b_name = json_object_get(_b, "name"); \
if(json_equal(_a_name, _b_name)) { \
fprintf(stderr, "ERROR: found two elements with the same name (%s) in %s", \
MARGO_ERROR(0, "Found two elements with the same name (\"%s\") in \"%s\"", \
json_string_value(_a_name), __container_name); \
return -1; \
} \
......@@ -211,16 +211,16 @@
const char* _name = json_string_value(_name_json); \
unsigned _len = strlen(_name); \
if(_len == 0) { \
fprintf(stderr, "ERROR: empty name field found\n"); \
MARGO_ERROR(0, "Empty \"name\" field"); \
return -1; \
} \
if(isdigit(_name[0])) { \
fprintf(stderr, "ERROR: first character of a name cannot be a digit\n"); \
MARGO_ERROR(0, "First character of a name cannot be a digit"); \
return -1; \
} \
for(unsigned _i=0; i < _len; i++) { \
if(!(isalnum(_name[_i]) || _name[i] == '_')) { \
fprintf(stderr, "ERROR: invalid character \"%c\" found in name\n", _name[i]); \
for(unsigned _i=0; _i < _len; _i++) { \
if(!(isalnum(_name[_i]) || _name[_i] == '_')) { \
MARGO_ERROR(0, "Invalid character \"%c\" found in name \"%s\"", _name[_i], _name); \
return -1; \
} \
} \
......@@ -244,7 +244,7 @@
json_object_set_new(_x, "affinity", json_array()); \
int _pool_index[] = { __VA_ARGS__, -1 }; \
json_t* _s = json_object(); \
json_object_set_new(_s, "sched_predef", json_string(__sched_predef)); \
json_object_set_new(_s, "type", json_string(__sched_predef)); \
json_t* _s_pools = json_array(); \
unsigned _i = 0; \
while(_pool_index[_i] != -1) { \
......
......@@ -13,6 +13,7 @@
#include <abt.h>
#include "margo.h"
#include "margo-instance.h"
#include "margo-timer.h"
#include "utlist.h"
......@@ -24,12 +25,12 @@ struct margo_timer_list
margo_timer_t *queue_head;
};
static void margo_timer_queue(
static void __margo_timer_queue(
struct margo_timer_list *timer_lst,
margo_timer_t *timer);
struct margo_timer_list* margo_timer_list_create()
struct margo_timer_list* __margo_timer_list_create()
{
struct margo_timer_list *timer_lst;
......@@ -43,7 +44,7 @@ struct margo_timer_list* margo_timer_list_create()
return timer_lst;
}
void margo_timer_list_free(margo_instance_id mid, struct margo_timer_list* timer_lst)
void __margo_timer_list_free(margo_instance_id mid, struct margo_timer_list* timer_lst)
{
margo_timer_t *cur;
ABT_pool handler_pool;
......@@ -82,7 +83,7 @@ void margo_timer_list_free(margo_instance_id mid, struct margo_timer_list* timer
return;
}
void margo_timer_init(
void __margo_timer_init(
margo_instance_id mid,
margo_timer_t *timer,
margo_timer_cb_fn cb_fn,
......@@ -91,7 +92,7 @@ void margo_timer_init(
{
struct margo_timer_list *timer_lst;