Commit 8ab84eed authored by Philip Carns's avatar Philip Carns
Browse files

revert mochi-cfg/json changes

- in preparation for using json-c and/or bedrock instead
parent e1fbfc17
......@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.63])
AC_INIT([bake], [0.6], [],[],[])
AC_INIT([bake], [0.3.4], [],[],[])
AC_CONFIG_MACRO_DIR([m4])
LT_INIT
......@@ -72,12 +72,6 @@ LIBS="$ABTIO_LIBS $LIBS"
CPPFLAGS="$ABTIO_CFLAGS $CPPFLAGS"
CFLAGS="$ABTIO_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([MOCHICFG],[mochi-cfg],[],
[AC_MSG_ERROR([Could not find working mochi-cfg installation!])])
LIBS="$MOCHICFG_LIBS $LIBS"
CPPFLAGS="$MOCHICFG_CFLAGS $CPPFLAGS"
CFLAGS="$MOCHICFG_CFLAGS $CFLAGS"
# NOTE: See README.md if the following does not work for you; some versions of
# nvml/pmem install broken .pc files
PKG_CHECK_MODULES([LIBPMEMOBJ],[libpmemobj],[],
......
......@@ -43,6 +43,7 @@ int bake_makepool(
* @param[in] mid Margo instance identifier
* @param[in] provider_id provider id
* @param[in] pool Pool on which to run the RPC handlers
* @param[in] target_name path to PMEM backend file
* @param[out] provider resulting provider
* @returns 0 on success, -1 otherwise
*/
......@@ -52,22 +53,6 @@ int bake_provider_register(
ABT_pool pool,
bake_provider_t* provider);
/**
* Initializes a BAKE provider with json configuration.
*
* @param[in] mid Margo instance identifier
* @param[in] provider_id provider id
* @param[in] pool Pool on which to run the RPC handlers
* @param[out] provider resulting provider
* @returns 0 on success, -1 otherwise
*/
int bake_provider_register_json(
margo_instance_id mid,
uint16_t provider_id,
ABT_pool pool,
bake_provider_t* provider,
const char* json_cfg_string);
/**
* @brief Deregisters and destroys the provider.
*
......@@ -144,13 +129,10 @@ int bake_provider_list_storage_targets(
bake_provider_t provider,
bake_target_id_t* targets);
/**
* Retrieves complete configuration for provider, encoded as json
*
* @param [in] provider
* @returns null terminated string that must be free'd by caller
/* TODO: the following configuration management functions would ideally be
* split off into a dedicated component. Treating this as a prototype for
* now.
*/
char* bake_provider_get_config(bake_provider_t provider);
/**
* @brief Set configuration parameters as string key/value pairs
......
......@@ -93,6 +93,10 @@ typedef int (*bake_create_fileset_fn)(backend_context_t context,
remi_fileset_t* fileset);
#endif
typedef int (*bake_set_conf_fn)(backend_context_t context,
const char* key,
const char* value);
typedef struct bake_backend {
const char* name;
bake_backend_initialize_fn _initialize;
......@@ -112,6 +116,7 @@ typedef struct bake_backend {
#ifdef USE_REMI
bake_create_fileset_fn _create_fileset;
#endif
bake_set_conf_fn _set_conf;
} bake_backend;
typedef bake_backend* bake_backend_t;
......
......@@ -57,7 +57,6 @@ typedef struct {
int log_fd; /* file descriptor for log */
off_t log_offset; /* next available unused offset in log */
ABT_mutex log_offset_mutex; /* protects the above during concurrent region creation */
/* TODO: the abt-io instance should be per provider, not target */
abt_io_instance_id abtioi; /* abt-io instance used by this provider */
bake_root_t* file_root;
char* root;
......@@ -164,16 +163,12 @@ static int bake_file_backend_initialize(bake_provider_t provider,
const char *tmp;
ptrdiff_t d;
struct stat statbuf;
json_t *abt_io_cfg;
char *abt_io_cfg_string = NULL;
char *abt_io_cfg_string_new = NULL;
if(provider->poolset == MARGO_BULK_POOLSET_NULL)
if(!provider->config.pipeline_enable)
{
fprintf(stderr, "Error: The Bake file backend requires pipelining.\n");
fprintf(stderr, " Enable pipelining with -p on the bake-server-daemon command line,\n");
fprintf(stderr, " programmatically with bake_provider_set_conf(provider, \"pipeline_enabled\", \"1\"), or\n");
fprintf(stderr, " via json configuration.\n");
fprintf(stderr, " Enable pipelining with -p on the bake-server-daemon command line or\n");
fprintf(stderr, " programmatically with bake_provider_set_conf(provider, \"pipeline_enabled\", \"1\")\n");
return(BAKE_ERR_INVALID_ARG);
}
......@@ -184,28 +179,15 @@ static int bake_file_backend_initialize(bake_provider_t provider,
d = tmp - path;
new_entry->root = strndup(path, d);
/* Look for abt-io object within this provider config. If
* present, use it. If not then initialize abt-io with defaults.
*/
mochi_cfg_get_object(provider->prov_cfg, "abt_io", &abt_io_cfg);
if(abt_io_cfg)
abt_io_cfg_string = mochi_cfg_emit(abt_io_cfg, "abt-io");
if(abt_io_cfg_string)
new_entry->abtioi = abt_io_init_json(abt_io_cfg_string);
else
new_entry->abtioi = abt_io_init(16);
/* initialize an abt-io instance just for this target */
/* TODO: make number of backing threads tunable */
new_entry->abtioi = abt_io_init(16);
if(!new_entry->abtioi)
{
ret = BAKE_ERR_IO;
goto error_cleanup;
}
/* query resulting runtime configuration from abt-io, and update that
* sub-section of the provider configuration
*/
abt_io_cfg_string_new = abt_io_get_config(new_entry->abtioi);
mochi_cfg_set_object_by_string(provider->prov_cfg, "abt-io", abt_io_cfg_string_new);
new_entry->log_fd = abt_io_open(new_entry->abtioi,
path, O_RDWR|O_DIRECT, 0);
if(new_entry->log_fd < 0) {
......@@ -601,6 +583,13 @@ static int bake_file_migrate_region(backend_context_t context,
return BAKE_ERR_OP_UNSUPPORTED;
}
static int bake_file_set_conf(backend_context_t context,
const char* key,
const char* value)
{
return 0;
}
#ifdef USE_REMI
static int bake_file_create_fileset(backend_context_t context,
remi_fileset_t* fileset)
......@@ -649,6 +638,7 @@ bake_backend g_bake_file_backend = {
#ifdef USE_REMI
._create_fileset = bake_file_create_fileset,
#endif
._set_conf = bake_file_set_conf
};
/* common utility function for relaying data in read_bulk/write_bulk */
......
......@@ -36,7 +36,7 @@ int parse_size(char *str, size_t *size_out)
const char *suffixes[] = { "B", "K", "M", "G", "T", "P" };
size_t size_mults[] = { 1ULL, 1ULL << 10, 1ULL << 20, 1ULL << 30, 1ULL << 40, 1ULL << 50 };
size_t size;
char suff[3] = {0};
char suff[2] = {0};
int i;
int ret;
......
......@@ -200,7 +200,7 @@ static int write_transfer_data(
/* resolve addr, could be addr of rpc sender (normal case) or a third
* party (proxy write)
*/
if(provider->poolset == MARGO_BULK_POOLSET_NULL)
if(provider->config.pipeline_enable == 0)
{
/* normal path; no pipeline or intermediate buffers */
......@@ -700,6 +700,13 @@ error:
}
#endif
static int bake_pmem_set_conf(backend_context_t context,
const char* key,
const char* value)
{
return 0;
}
bake_backend g_bake_pmem_backend = {
.name = "pmem",
._initialize = bake_pmem_backend_initialize,
......@@ -719,6 +726,7 @@ bake_backend g_bake_pmem_backend = {
#ifdef USE_REMI
._create_fileset = bake_pmem_create_fileset,
#endif
._set_conf = bake_pmem_set_conf
};
static void xfer_ult(void *_args)
......
......@@ -11,7 +11,6 @@
#include <libpmemobj.h>
#include <unistd.h>
#include <fcntl.h>
#include <mochi-cfg.h>
#include <margo.h>
#include <margo-bulk-pool.h>
#ifdef USE_REMI
......@@ -30,6 +29,15 @@ typedef struct
UT_hash_handle hh;
} bake_target_t;
struct bake_provider_conf
{
unsigned pipeline_enable; /* pipeline yes or no; implies intermediate buffering */
unsigned pipeline_npools; /* number of preallocated buffer pools */
unsigned pipeline_nbuffers_per_pool; /* buffers per buffer pool */
unsigned pipeline_first_buffer_size; /* size of buffers in smallest pool */
unsigned pipeline_multiplier; /* factor size increase per pool */
};
typedef struct bake_provider
{
margo_instance_id mid;
......@@ -47,7 +55,7 @@ typedef struct bake_provider
int owns_remi_provider;
#endif
json_t *prov_cfg; /* provider-specific configuration */
struct bake_provider_conf config; /* configuration for transfers */
margo_bulk_poolset_t poolset; /* intermediate buffers, if used */
// list of RPC ids
......
/*
* (C) 2015 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
......@@ -10,28 +10,8 @@
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <mochi-cfg.h>
#include <bake-server.h>
/* NOTE: in the following we don't specify all of the sub-component
* parameters; just things that might be overridden by command line or are
* mandatory for this daemon.
*
* "bake_providers" is an array to support multiple providers, each possibly
* has it's own configuration
*
* "targets" is also an array to support multiple targets per provider
*/
# define BAKE_SERVER_DAEMON_DEFAULT_CFG \
"{ \"bake-server-daemon\": {" \
" \"host_file\": \"\"," \
" \"bake_providers\": []," \
" \"margo\": {" \
" \"mercury\": {" \
" \"addr_str\": \"na+sm://\"," \
" \"server_mode\": 1" \
"}}}}"
typedef enum {
MODE_TARGETS = 0,
MODE_PROVIDERS = 1
......@@ -39,7 +19,6 @@ typedef enum {
struct options
{
char *json_input;
char *listen_addr_str;
unsigned num_pools;
char **bake_pools;
......@@ -48,100 +27,19 @@ struct options
mplex_mode_t mplex_mode;
};
#define MAX_PROVIDERS 256
static bake_provider_t g_provider_array[MAX_PROVIDERS];
static int g_provider_array_size = 0;
static void usage(int argc, char **argv)
{
fprintf(stderr, "Usage: bake-server-daemon [OPTIONS] <bake_pool1> <bake_pool2> ...\n");
fprintf(stderr, "Usage: bake-server-daemon [OPTIONS] <listen_addr> <bake_pool1> <bake_pool2> ...\n");
fprintf(stderr, " listen_addr is the Mercury address to listen on\n");
fprintf(stderr, " bake_pool is the path to the BAKE pool\n");
fprintf(stderr, " (prepend pmem: or file: to specify backend format)\n");
fprintf(stderr, " [-l listen_addr] is the Mercury address to listen on\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or targets) for managing multiple pools (default is targets)\n");
fprintf(stderr, " [-p] enable pipelining\n");
fprintf(stderr, " [-j filename] json configuration file \n");
fprintf(stderr, "Example: ./bake-server-daemon tcp://localhost:1234 /dev/shm/foo.dat /dev/shm/bar.dat\n");
return;
}
static json_t* resolve_json(struct options *opts)
{
json_t* cfg;
json_t* margo_cfg;
json_t* hg_cfg;
json_t* prov_array;
json_t* prov_cfg;
json_t* target_array;
int i;
/* Concept: we have 3 sources of configuration parameters to consider,
* in order of precedence:
* - explicit command line arguments
* - json configuration file
* - default parameters
*
* We accomplish this by first parsing config file (if present) and
* setting remaining defaults. Then we explicitly set extra parameters
* that may have been set on command line.
*/
/* read json input config file and populate missing defaults */
if(opts->json_input)
cfg = mochi_cfg_get_component_file(opts->json_input, "bake-server-daemon", BAKE_SERVER_DAEMON_DEFAULT_CFG);
else
cfg = mochi_cfg_get_component("{\"bake-server-daemon\":{}}", "bake-server-daemon", BAKE_SERVER_DAEMON_DEFAULT_CFG);
/* set parameters from command line */
if(opts->listen_addr_str)
{
mochi_cfg_get_object(cfg, "margo", &margo_cfg);
mochi_cfg_get_object(margo_cfg, "mercury", &hg_cfg);
mochi_cfg_set_value_string(hg_cfg, "addr_str", opts->listen_addr_str);
}
if(opts->host_file)
mochi_cfg_set_value_string(cfg, "host_file", opts->host_file);
/* construct json to represent provider and target hierarchy */
/* If mplex_mode is "TARGETS", then there is one provider with N
* targets. If mplex_mode is "PROVIDERS", then there are N providers
* each with one target.
* NOTE: we just append here; there is no effort to determine if command
* line arguments have duplicated something that was already in the json
* NOTE: we also only apply the -p pipelining argument to
* providers that are specified on the command line.
*/
/* TODO: error handling below */
prov_array = json_object_get(cfg, "bake_providers");
if(opts->mplex_mode == MODE_TARGETS)
{
prov_cfg = json_object();
target_array = json_array();
for(i=0; i<opts->num_pools; i++)
{
json_array_append(target_array, json_string(opts->bake_pools[i]));
}
json_object_set(prov_cfg, "targets", target_array);
json_object_set(prov_cfg, "pipeline_enable", json_integer(opts->pipeline_enabled));
json_array_append(prov_array, prov_cfg);
}
else
{
for(i=0; i<opts->num_pools; i++)
{
prov_cfg = json_object();
target_array = json_array();
json_array_append(target_array, json_string(opts->bake_pools[i]));
json_object_set(prov_cfg, "targets", target_array);
json_object_set(prov_cfg, "pipeline_enable", json_integer(opts->pipeline_enabled));
json_array_append(prov_array, prov_cfg);
}
}
return(cfg);
}
static void parse_args(int argc, char **argv, struct options *opts)
{
int opt;
......@@ -149,19 +47,13 @@ static void parse_args(int argc, char **argv, struct options *opts)
memset(opts, 0, sizeof(*opts));
/* get options */
while((opt = getopt(argc, argv, "l:j:f:m:p")) != -1)
while((opt = getopt(argc, argv, "f:m:p")) != -1)
{
switch(opt)
{
case 'l':
opts->listen_addr_str = optarg;
break;
case 'f':
opts->host_file = optarg;
break;
case 'j':
opts->json_input = optarg;
break;
case 'm':
if(0 == strcmp(optarg, "targets"))
opts->mplex_mode = MODE_TARGETS;
......@@ -182,12 +74,13 @@ static void parse_args(int argc, char **argv, struct options *opts)
}
/* get required arguments after options */
if((argc - optind) < 1)
if((argc - optind) < 2)
{
usage(argc, argv);
exit(EXIT_FAILURE);
}
opts->num_pools = argc - optind;
opts->num_pools = argc - optind - 1;
opts->listen_addr_str = argv[optind++];
opts->bake_pools = calloc(opts->num_pools, sizeof(char*));
int i;
for(i=0; i < opts->num_pools; i++) {
......@@ -197,54 +90,26 @@ static void parse_args(int argc, char **argv, struct options *opts)
return;
}
int main(int argc, char **argv)
int main(int argc, char **argv)
{
struct options opts;
margo_instance_id mid;
int ret;
json_t* cfg = NULL;
json_t* margo_cfg = NULL;
char *cfg_str;
char *margo_cfg_str;
const char *host_file;
size_t prov_index, target_index;
json_t *prov_value, *target_value;
char *prov_cfg_string;
int i;
parse_args(argc, argv, &opts);
cfg = resolve_json(&opts);
if(!cfg)
{
fprintf(stderr, "Error: unable to resolve json and command line arguments.\n");
return(-1);
}
/* start margo */
mochi_cfg_get_object(cfg, "margo", &margo_cfg);
cfg_str = mochi_cfg_emit(margo_cfg, "margo");
struct margo_init_info mid_info = {
.json_config = cfg_str,
.progress_pool = ABT_POOL_NULL,
.rpc_pool = ABT_POOL_NULL,
.hg_class = NULL,
.hg_context = NULL,
.hg_init_info = NULL
};
mid = margo_init_ext(opts.listen_addr_str, MARGO_SERVER_MODE, &mid_info);
/* use the main xstream for driving progress and executing rpc handlers */
mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
if(mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: margo_init()\n");
mochi_cfg_release_component(cfg);
return(-1);
}
free(cfg_str);
margo_enable_remote_shutdown(mid);
mochi_cfg_get_value_string(cfg, "host_file", &host_file);
if(host_file && strlen(host_file))
if(opts.host_file)
{
/* write the server address to file if requested */
FILE *fp;
......@@ -259,7 +124,6 @@ int main(int argc, char **argv)
{
fprintf(stderr, "Error: margo_addr_self()\n");
margo_finalize(mid);
mochi_cfg_release_component(cfg);
return(-1);
}
hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz, self_addr);
......@@ -268,17 +132,15 @@ int main(int argc, char **argv)
fprintf(stderr, "Error: margo_addr_to_string()\n");
margo_addr_free(mid, self_addr);
margo_finalize(mid);
mochi_cfg_release_component(cfg);
return(-1);
}
margo_addr_free(mid, self_addr);
fp = fopen(host_file, "w");
fp = fopen(opts.host_file, "w");
if(!fp)
{
perror("fopen");
margo_finalize(mid);
mochi_cfg_release_component(cfg);
return(-1);
}
......@@ -286,81 +148,74 @@ int main(int argc, char **argv)
fclose(fp);
}
/* TODO: error handling? */
json_array_foreach(
json_object_get(cfg, "bake_providers"), prov_index, prov_value)
{
bake_target_id_t tid;
/* initialize the BAKE server */
if(opts.mplex_mode == MODE_PROVIDERS) {
int i;
for(i=0; i< opts.num_pools; i++) {
bake_provider_t provider;
bake_target_id_t tid;
ret = bake_provider_register(mid, i+1,
BAKE_ABT_POOL_DEFAULT,
&provider);
prov_cfg_string = mochi_cfg_emit(prov_value, NULL);
if(g_provider_array_size == MAX_PROVIDERS)
{
fprintf(stderr, "Error: hit provider limit of %d\n", MAX_PROVIDERS);
margo_finalize(mid);
mochi_cfg_release_component(cfg);
return(-1);
}
ret = bake_provider_register_json(mid, prov_index+1,
BAKE_ABT_POOL_DEFAULT,
&g_provider_array[g_provider_array_size],
prov_cfg_string);
if(ret != 0)
{
bake_perror( "Error: bake_provider_register_json()", ret);
margo_finalize(mid);
mochi_cfg_release_component(cfg);
return(-1);
}
free(prov_cfg_string);
if(ret != 0)
{
bake_perror( "Error: bake_provider_register()", ret);
margo_finalize(mid);
return(-1);
}
if(opts.pipeline_enabled)
bake_provider_set_conf(provider, "pipeline_enabled", "1");
ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);
json_array_foreach(
json_object_get(prov_value, "targets"), target_index, target_value)
{
ret = bake_provider_add_storage_target(
g_provider_array[g_provider_array_size], json_string_value(target_value), &tid);
if(ret != 0)
{
bake_perror("Error: bake_provider_add_storage_target()", ret);
margo_finalize(mid);
mochi_cfg_release_component(cfg);
return(-1);
}
printf("Provider %lu managing new target %s at multiplex id %lu\n", prov_index, json_string_value(target_value), prov_index+1);
printf("Provider %d managing new target at multiplex id %d\n", i, i+1);
}
g_provider_array_size++;
}
/* TODO: bundle the following stuff into a helper function so that it
* could be re-used in other situations
*/
} else {
/* update top level json with current runtime settings from margo */
margo_cfg_str = margo_get_config(mid);
ret = mochi_cfg_set_object_by_string(cfg, "margo", margo_cfg_str);
free(margo_cfg_str);
int i;
bake_provider_t provider;
ret = bake_provider_register(mid, 1,
BAKE_ABT_POOL_DEFAULT,
&provider);
/* iterate through providers and populate json with current state */
json_array_clear(json_object_get(cfg, "bake_providers"));
for(i=0; i<g_provider_array_size; i++)
{
char* prov_cfg_string = bake_provider_get_config(g_provider_array[i]);
if(prov_cfg_string)
if(ret != 0)
{
mochi_cfg_append_array_by_string(cfg, "bake_providers", prov_cfg_string);
free(prov_cfg_string);
bake_perror("Error: bake_provider_register()", ret);
margo_finalize(mid);
return(-1);
}
}
/* display full json for the daemon */
cfg_str = mochi_cfg_emit(cfg, "bake-server-daemon");
printf("%s\n", cfg_str);
free(cfg_str);
if(opts.pipeline_enabled)
bake_provider_set_conf(provider, "pipeline_enabled", "1");
for(i=0; i < opts.num_pools; i++) {
bake_target_id_t tid;
ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);
if(ret != 0)
{
bake_perror("Error: bake_provider_add_storage_target()", ret);
margo_finalize(mid);
return(-1);
}
printf("Provider 0 managing new target at multiplex id %d\n", 1);
}