Commit dd2f4fd5 authored by Philip Carns's avatar Philip Carns
Browse files

Merge branch 'carns/dev-json-config' into 'master'

implement json configuration

See merge request !16
parents 1e69eda8 a968469f
......@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.63])
AC_INIT([bake], [0.3.4], [],[],[])
AC_INIT([bake], [0.4], [],[],[])
AC_CONFIG_MACRO_DIR([m4])
LT_INIT
......@@ -72,6 +72,12 @@ LIBS="$ABTIO_LIBS $LIBS"
CPPFLAGS="$ABTIO_CFLAGS $CPPFLAGS"
CFLAGS="$ABTIO_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([MOCHICFG],[mochi-cfg],[],
[AC_MSG_ERROR([Could not find working mochi-cfg installation!])])
LIBS="$MOCHICFG_LIBS $LIBS"
CPPFLAGS="$MOCHICFG_CFLAGS $CPPFLAGS"
CFLAGS="$MOCHICFG_CFLAGS $CFLAGS"
# NOTE: See README.md if the following does not work for you; some versions of
# nvml/pmem install broken .pc files
PKG_CHECK_MODULES([LIBPMEMOBJ],[libpmemobj],[],
......
......@@ -43,7 +43,6 @@ int bake_makepool(
* @param[in] mid Margo instance identifier
* @param[in] provider_id provider id
* @param[in] pool Pool on which to run the RPC handlers
* @param[in] target_name path to PMEM backend file
* @param[out] provider resulting provider
* @returns 0 on success, -1 otherwise
*/
......@@ -53,6 +52,22 @@ int bake_provider_register(
ABT_pool pool,
bake_provider_t* provider);
/**
* Initializes a BAKE provider with json configuration.
*
* @param[in] mid Margo instance identifier
* @param[in] provider_id provider id
* @param[in] pool Pool on which to run the RPC handlers
* @param[out] provider resulting provider
* @returns 0 on success, -1 otherwise
*/
int bake_provider_register_json(
margo_instance_id mid,
uint16_t provider_id,
ABT_pool pool,
bake_provider_t* provider,
const char* json_cfg_string);
/**
* @brief Deregisters and destroys the provider.
*
......@@ -129,10 +144,13 @@ int bake_provider_list_storage_targets(
bake_provider_t provider,
bake_target_id_t* targets);
/* TODO: the following configuration management functions would ideally be
* split off into a dedicated component. Treating this as a prototype for
* now.
/**
* Retrieves complete configuration for provider, encoded as json
*
* @param [in] provider
* @returns null terminated string that must be free'd by caller
*/
char* bake_provider_get_config(bake_provider_t provider);
/**
* @brief Set configuration parameters as string key/value pairs
......
......@@ -93,10 +93,6 @@ typedef int (*bake_create_fileset_fn)(backend_context_t context,
remi_fileset_t* fileset);
#endif
typedef int (*bake_set_conf_fn)(backend_context_t context,
const char* key,
const char* value);
typedef struct bake_backend {
const char* name;
bake_backend_initialize_fn _initialize;
......@@ -116,7 +112,6 @@ typedef struct bake_backend {
#ifdef USE_REMI
bake_create_fileset_fn _create_fileset;
#endif
bake_set_conf_fn _set_conf;
} bake_backend;
typedef bake_backend* bake_backend_t;
......
......@@ -57,6 +57,7 @@ typedef struct {
int log_fd; /* file descriptor for log */
off_t log_offset; /* next available unused offset in log */
ABT_mutex log_offset_mutex; /* protects the above during concurrent region creation */
/* TODO: the abt-io instance should be per provider, not target */
abt_io_instance_id abtioi; /* abt-io instance used by this provider */
bake_root_t* file_root;
char* root;
......@@ -163,12 +164,16 @@ static int bake_file_backend_initialize(bake_provider_t provider,
const char *tmp;
ptrdiff_t d;
struct stat statbuf;
json_t *abt_io_cfg;
char *abt_io_cfg_string = NULL;
char *abt_io_cfg_string_new = NULL;
if(!provider->config.pipeline_enable)
if(provider->poolset == MARGO_BULK_POOLSET_NULL)
{
fprintf(stderr, "Error: The Bake file backend requires pipelining.\n");
fprintf(stderr, " Enable pipelining with -p on the bake-server-daemon command line or\n");
fprintf(stderr, " programmatically with bake_provider_set_conf(provider, \"pipeline_enabled\", \"1\")\n");
fprintf(stderr, " Enable pipelining with -p on the bake-server-daemon command line,\n");
fprintf(stderr, " programmatically with bake_provider_set_conf(provider, \"pipeline_enabled\", \"1\"), or\n");
fprintf(stderr, " via json configuration.\n");
return(BAKE_ERR_INVALID_ARG);
}
......@@ -179,15 +184,28 @@ static int bake_file_backend_initialize(bake_provider_t provider,
d = tmp - path;
new_entry->root = strndup(path, d);
/* initialize an abt-io instance just for this target */
/* TODO: make number of backing threads tunable */
new_entry->abtioi = abt_io_init(16);
/* Look for abt-io object within this provider config. If
* present, use it. If not then initialize abt-io with defaults.
*/
mochi_cfg_get_object(provider->prov_cfg, "abt_io", &abt_io_cfg);
if(abt_io_cfg)
abt_io_cfg_string = mochi_cfg_emit(abt_io_cfg, "abt-io");
if(abt_io_cfg_string)
new_entry->abtioi = abt_io_init_json(abt_io_cfg_string);
else
new_entry->abtioi = abt_io_init(16);
if(!new_entry->abtioi)
{
ret = BAKE_ERR_IO;
goto error_cleanup;
}
/* query resulting runtime configuration from abt-io, and update that
* sub-section of the provider configuration
*/
abt_io_cfg_string_new = abt_io_get_config(new_entry->abtioi);
mochi_cfg_set_object_by_string(provider->prov_cfg, "abt-io", abt_io_cfg_string_new);
new_entry->log_fd = abt_io_open(new_entry->abtioi,
path, O_RDWR|O_DIRECT, 0);
if(new_entry->log_fd < 0) {
......@@ -583,13 +601,6 @@ static int bake_file_migrate_region(backend_context_t context,
return BAKE_ERR_OP_UNSUPPORTED;
}
static int bake_file_set_conf(backend_context_t context,
const char* key,
const char* value)
{
return 0;
}
#ifdef USE_REMI
static int bake_file_create_fileset(backend_context_t context,
remi_fileset_t* fileset)
......@@ -638,7 +649,6 @@ bake_backend g_bake_file_backend = {
#ifdef USE_REMI
._create_fileset = bake_file_create_fileset,
#endif
._set_conf = bake_file_set_conf
};
/* common utility function for relaying data in read_bulk/write_bulk */
......
......@@ -200,7 +200,7 @@ static int write_transfer_data(
/* resolve addr, could be addr of rpc sender (normal case) or a third
* party (proxy write)
*/
if(provider->config.pipeline_enable == 0)
if(provider->poolset == MARGO_BULK_POOLSET_NULL)
{
/* normal path; no pipeline or intermediate buffers */
......@@ -700,13 +700,6 @@ error:
}
#endif
static int bake_pmem_set_conf(backend_context_t context,
const char* key,
const char* value)
{
return 0;
}
bake_backend g_bake_pmem_backend = {
.name = "pmem",
._initialize = bake_pmem_backend_initialize,
......@@ -726,7 +719,6 @@ bake_backend g_bake_pmem_backend = {
#ifdef USE_REMI
._create_fileset = bake_pmem_create_fileset,
#endif
._set_conf = bake_pmem_set_conf
};
static void xfer_ult(void *_args)
......
......@@ -11,6 +11,7 @@
#include <libpmemobj.h>
#include <unistd.h>
#include <fcntl.h>
#include <mochi-cfg.h>
#include <margo.h>
#include <margo-bulk-pool.h>
#ifdef USE_REMI
......@@ -29,15 +30,6 @@ typedef struct
UT_hash_handle hh;
} bake_target_t;
struct bake_provider_conf
{
unsigned pipeline_enable; /* pipeline yes or no; implies intermediate buffering */
unsigned pipeline_npools; /* number of preallocated buffer pools */
unsigned pipeline_nbuffers_per_pool; /* buffers per buffer pool */
unsigned pipeline_first_buffer_size; /* size of buffers in smallest pool */
unsigned pipeline_multiplier; /* factor size increase per pool */
};
typedef struct bake_provider
{
margo_instance_id mid;
......@@ -55,7 +47,7 @@ typedef struct bake_provider
int owns_remi_provider;
#endif
struct bake_provider_conf config; /* configuration for transfers */
json_t *prov_cfg; /* provider-specific configuration */
margo_bulk_poolset_t poolset; /* intermediate buffers, if used */
// list of RPC ids
......
/*
* (C) 2015 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
......@@ -10,8 +10,28 @@
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <mochi-cfg.h>
#include <bake-server.h>
/* NOTE: in the following we don't specify all of the sub-component
* parameters; just things that might be overridden by command line or are
* mandatory for this daemon.
*
* "bake_providers" is an array to support multiple providers, each possibly
* has it's own configuration
*
* "targets" is also an array to support multiple targets per provider
*/
# define BAKE_SERVER_DAEMON_DEFAULT_CFG \
"{ \"bake-server-daemon\": {" \
" \"host_file\": \"\"," \
" \"bake_providers\": []," \
" \"margo\": {" \
" \"mercury\": {" \
" \"addr_str\": \"na+sm://\"," \
" \"server_mode\": 1" \
"}}}}"
typedef enum {
MODE_TARGETS = 0,
MODE_PROVIDERS = 1
......@@ -19,6 +39,7 @@ typedef enum {
struct options
{
char *json_input;
char *listen_addr_str;
unsigned num_pools;
char **bake_pools;
......@@ -27,19 +48,100 @@ struct options
mplex_mode_t mplex_mode;
};
#define MAX_PROVIDERS 256
static bake_provider_t g_provider_array[MAX_PROVIDERS];
static int g_provider_array_size = 0;
static void usage(int argc, char **argv)
{
fprintf(stderr, "Usage: bake-server-daemon [OPTIONS] <listen_addr> <bake_pool1> <bake_pool2> ...\n");
fprintf(stderr, " listen_addr is the Mercury address to listen on\n");
fprintf(stderr, "Usage: bake-server-daemon [OPTIONS] <bake_pool1> <bake_pool2> ...\n");
fprintf(stderr, " bake_pool is the path to the BAKE pool\n");
fprintf(stderr, " (prepend pmem: or file: to specify backend format)\n");
fprintf(stderr, " [-l listen_addr] is the Mercury address to listen on\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or targets) for managing multiple pools (default is targets)\n");
fprintf(stderr, " [-p] enable pipelining\n");
fprintf(stderr, " [-j filename] json configuration file \n");
fprintf(stderr, "Example: ./bake-server-daemon tcp://localhost:1234 /dev/shm/foo.dat /dev/shm/bar.dat\n");
return;
}
static json_t* resolve_json(struct options *opts)
{
json_t* cfg;
json_t* margo_cfg;
json_t* hg_cfg;
json_t* prov_array;
json_t* prov_cfg;
json_t* target_array;
int i;
/* Concept: we have 3 sources of configuration parameters to consider,
* in order of precedence:
* - explicit command line arguments
* - json configuration file
* - default parameters
*
* We accomplish this by first parsing config file (if present) and
* setting remaining defaults. Then we explicitly set extra parameters
* that may have been set on command line.
*/
/* read json input config file and populate missing defaults */
if(opts->json_input)
cfg = mochi_cfg_get_component_file(opts->json_input, "bake-server-daemon", BAKE_SERVER_DAEMON_DEFAULT_CFG);
else
cfg = mochi_cfg_get_component("{\"bake-server-daemon\":{}}", "bake-server-daemon", BAKE_SERVER_DAEMON_DEFAULT_CFG);
/* set parameters from command line */
if(opts->listen_addr_str)
{
mochi_cfg_get_object(cfg, "margo", &margo_cfg);
mochi_cfg_get_object(margo_cfg, "mercury", &hg_cfg);
mochi_cfg_set_value_string(hg_cfg, "addr_str", opts->listen_addr_str);
}
if(opts->host_file)
mochi_cfg_set_value_string(cfg, "host_file", opts->host_file);
/* construct json to represent provider and target hierarchy */
/* If mplex_mode is "TARGETS", then there is one provider with N
* targets. If mplex_mode is "PROVIDERS", then there are N providers
* each with one target.
* NOTE: we just append here; there is no effort to determine if command
* line arguments have duplicated something that was already in the json
* NOTE: we also only apply the -p pipelining argument to
* providers that are specified on the command line.
*/
/* TODO: error handling below */
prov_array = json_object_get(cfg, "bake_providers");
if(opts->mplex_mode == MODE_TARGETS)
{
prov_cfg = json_object();
target_array = json_array();
for(i=0; i<opts->num_pools; i++)
{
json_array_append(target_array, json_string(opts->bake_pools[i]));
}
json_object_set(prov_cfg, "targets", target_array);
json_object_set(prov_cfg, "pipeline_enable", json_integer(opts->pipeline_enabled));
json_array_append(prov_array, prov_cfg);
}
else
{
for(i=0; i<opts->num_pools; i++)
{
prov_cfg = json_object();
target_array = json_array();
json_array_append(target_array, json_string(opts->bake_pools[i]));
json_object_set(prov_cfg, "targets", target_array);
json_object_set(prov_cfg, "pipeline_enable", json_integer(opts->pipeline_enabled));
json_array_append(prov_array, prov_cfg);
}
}
return(cfg);
}
static void parse_args(int argc, char **argv, struct options *opts)
{
int opt;
......@@ -47,13 +149,19 @@ static void parse_args(int argc, char **argv, struct options *opts)
memset(opts, 0, sizeof(*opts));
/* get options */
while((opt = getopt(argc, argv, "f:m:p")) != -1)
while((opt = getopt(argc, argv, "l:j:f:m:p")) != -1)
{
switch(opt)
{
case 'l':
opts->listen_addr_str = optarg;
break;
case 'f':
opts->host_file = optarg;
break;
case 'j':
opts->json_input = optarg;
break;
case 'm':
if(0 == strcmp(optarg, "targets"))
opts->mplex_mode = MODE_TARGETS;
......@@ -74,13 +182,12 @@ static void parse_args(int argc, char **argv, struct options *opts)
}
/* get required arguments after options */
if((argc - optind) < 2)
if((argc - optind) < 1)
{
usage(argc, argv);
exit(EXIT_FAILURE);
}
opts->num_pools = argc - optind - 1;
opts->listen_addr_str = argv[optind++];
opts->num_pools = argc - optind;
opts->bake_pools = calloc(opts->num_pools, sizeof(char*));
int i;
for(i=0; i < opts->num_pools; i++) {
......@@ -90,26 +197,45 @@ static void parse_args(int argc, char **argv, struct options *opts)
return;
}
int main(int argc, char **argv)
int main(int argc, char **argv)
{
struct options opts;
margo_instance_id mid;
int ret;
json_t* cfg = NULL;
json_t* margo_cfg = NULL;
char *cfg_str;
char *margo_cfg_str;
const char *host_file;
size_t prov_index, target_index;
json_t *prov_value, *target_value;
char *prov_cfg_string;
int i;
parse_args(argc, argv, &opts);
cfg = resolve_json(&opts);
if(!cfg)
{
fprintf(stderr, "Error: unable to resolve json and command line arguments.\n");
return(-1);
}
/* start margo */
/* use the main xstream for driving progress and executing rpc handlers */
mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
mochi_cfg_get_object(cfg, "margo", &margo_cfg);
cfg_str = mochi_cfg_emit(margo_cfg, "margo");
mid = margo_init_json(cfg_str);
if(mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
free(cfg_str);
margo_enable_remote_shutdown(mid);
if(opts.host_file)
mochi_cfg_get_value_string(cfg, "host_file", &host_file);
if(host_file && strlen(host_file))
{
/* write the server address to file if requested */
FILE *fp;
......@@ -136,7 +262,7 @@ int main(int argc, char **argv)
}
margo_addr_free(mid, self_addr);
fp = fopen(opts.host_file, "w");
fp = fopen(host_file, "w");
if(!fp)
{
perror("fopen");
......@@ -148,28 +274,36 @@ int main(int argc, char **argv)
fclose(fp);
}
/* initialize the BAKE server */
if(opts.mplex_mode == MODE_PROVIDERS) {
int i;
for(i=0; i< opts.num_pools; i++) {
bake_provider_t provider;
bake_target_id_t tid;
ret = bake_provider_register(mid, i+1,
BAKE_ABT_POOL_DEFAULT,
&provider);
if(ret != 0)
{
bake_perror( "Error: bake_provider_register()", ret);
margo_finalize(mid);
return(-1);
}
if(opts.pipeline_enabled)
bake_provider_set_conf(provider, "pipeline_enabled", "1");
/* TODO: error handling? */
json_array_foreach(
json_object_get(cfg, "bake_providers"), prov_index, prov_value)
{
bake_target_id_t tid;
ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);
prov_cfg_string = mochi_cfg_emit(prov_value, NULL);
if(g_provider_array_size == MAX_PROVIDERS)
{
fprintf(stderr, "Error: hit provider limit of %d\n", MAX_PROVIDERS);
margo_finalize(mid);
return(-1);
}
ret = bake_provider_register_json(mid, prov_index+1,
BAKE_ABT_POOL_DEFAULT,
&g_provider_array[g_provider_array_size],
prov_cfg_string);
if(ret != 0)
{
bake_perror( "Error: bake_provider_register_json()", ret);
margo_finalize(mid);
return(-1);
}
free(prov_cfg_string);
json_array_foreach(
json_object_get(prov_value, "targets"), target_index, target_value)
{
ret = bake_provider_add_storage_target(
g_provider_array[g_provider_array_size], json_string_value(target_value), &tid);
if(ret != 0)
{
bake_perror("Error: bake_provider_add_storage_target()", ret);
......@@ -177,42 +311,37 @@ int main(int argc, char **argv)
return(-1);
}
printf("Provider %d managing new target at multiplex id %d\n", i, i+1);
printf("Provider %lu managing new target %s at multiplex id %lu\n", prov_index, json_string_value(target_value), prov_index+1);
}
g_provider_array_size++;
}
} else {
/* TODO: bundle the following stuff into a helper function so that it
* could be re-used in other situations
*/
int i;
bake_provider_t provider;
ret = bake_provider_register(mid, 1,
BAKE_ABT_POOL_DEFAULT,
&provider);
/* update top level json with current runtime settings from margo */
margo_cfg_str = margo_get_config(mid);
ret = mochi_cfg_set_object_by_string(cfg, "margo", margo_cfg_str);
free(margo_cfg_str);
if(ret != 0)
/* iterate through providers and populate json with current state */
json_array_clear(json_object_get(cfg, "bake_providers"));
for(i=0; i<g_provider_array_size; i++)
{
char* prov_cfg_string = bake_provider_get_config(g_provider_array[i]);
if(prov_cfg_string)
{
bake_perror("Error: bake_provider_register()", ret);
margo_finalize(mid);
return(-1);
}
if(opts.pipeline_enabled)
bake_provider_set_conf(provider, "pipeline_enabled", "1");
for(i=0; i < opts.num_pools; i++) {
bake_target_id_t tid;
ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);
if(ret != 0)
{
bake_perror("Error: bake_provider_add_storage_target()", ret);
margo_finalize(mid);
return(-1);
}
printf("Provider 0 managing new target at multiplex id %d\n", 1);
mochi_cfg_append_array_by_string(cfg, "bake_providers", prov_cfg_string);
free(prov_cfg_string);
}
}
/* display full json for the daemon */
cfg_str = mochi_cfg_emit(cfg, "bake-server-daemon");
printf("%s\n", cfg_str);
free(cfg_str);
/* suspend until the BAKE server gets a shutdown signal from the client */
margo_wait_for_finalize(mid);
......
/*
* (C) 2015 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
......@@ -10,6 +10,7 @@
#include <libpmemobj.h>
#include <unistd.h>
#include <fcntl.h>
#include <mochi-cfg.h>
#include <margo.h>
#include <margo-bulk-pool.h>
#ifdef USE_REMI
......@@ -42,15 +43,15 @@ DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_region_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_target_ult)
/* TODO: support different parameters per provider instance */
struct bake_provider_conf g_default_bake_provider_conf =
{
.pipeline_enable = 0,
.pipeline_npools = 4,
.pipeline_nbuffers_per_pool = 32,
.pipeline_first_buffer_size = 65536,
.pipeline_multiplier = 4
};
# define BAKE_PROV_DEFAULT_CFG \
"{" \
" \"version\": \"" PACKAGE_VERSION "\"," \
" \"pipeline_enable\": 0," \
" \"pipeline_npools\": 4," \
" \"pipeline_nbuffers_per_pool\": 32," \
" \"pipeline_first_buffer_size\": 65536," \
" \"pipeline_multiplier\": 4" \