Commit da26ffb5 authored by Philip Carns's avatar Philip Carns
Browse files

convert pipeline config params to json

- also remove deprecated *set_conf* functions
parent d25334be
......@@ -128,39 +128,6 @@ int bake_provider_count_storage_targets(bake_provider_t provider,
int bake_provider_list_storage_targets(bake_provider_t provider,
bake_target_id_t* targets);
/* TODO: the following configuration management functions would ideally be
* split off into a dedicated component. Treating this as a prototype for
* now.
*/
/**
* @brief Set configuration parameters as string key/value pairs
*
* @param provider Bake provider
* @param key Configuration key
* @param value Configuratiion value
*
* @return 0 on success, -1 on failure
*/
int bake_provider_set_conf(bake_provider_t provider,
const char* key,
const char* value);
/**
* @brief Set configuration parameters for a target.
*
* @param provider Bake provider
* @param tid Bake target id
* @param key Configuration key
* @param value Configuration value
*
* @return 0 on success, -1 on failure
*/
int bake_target_set_conf(bake_provider_t provider,
bake_target_id_t tid,
const char* key,
const char* value);
#ifdef __cplusplus
}
#endif
......
......@@ -10,8 +10,9 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <json-c/json.h>
#include <abt-io.h>
#include "bake-config.h"
#include "bake.h"
#include "bake-rpc.h"
......@@ -160,7 +161,8 @@ static int bake_file_backend_initialize(bake_provider_t provider,
ptrdiff_t d;
struct stat statbuf;
if (!provider->config.pipeline_enable) {
if (!json_object_get_boolean(
json_object_object_get(provider->json_cfg, "pipeline_enable"))) {
fprintf(stderr, "Error: The Bake file backend requires pipelining.\n");
fprintf(stderr,
" Enable pipelining with -p on the bake-server-daemon "
......
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __BAKE_MACROS
#define __BAKE_MACROS
static const int json_type_int64 = json_type_int;
// Checks if a JSON object has a particular key and its value is of the
// specified type (not array or object or null). If the field does not exist,
// creates it with the provided value.. If the field exists but is not of type
// object, prints an error and return -1. After a call to this macro, __out is
// set to the ceated/found field.
#define CONFIG_HAS_OR_CREATE(__config, __type, __key, __value, __fullname, \
__out) \
do { \
__out = json_object_object_get(__config, __key); \
if (__out && !json_object_is_type(__out, json_type_##__type)) { \
fprintf(stderr, \
"\"%s\" in configuration but has an incorrect type " \
"(expected %s)", \
__fullname, #__type); \
return -1; \
} \
if (!__out) { \
__out = json_object_new_##__type(__value); \
json_object_object_add(__config, __key, __out); \
} \
} while (0)
#endif /* __BAKE_MACROS */
#include <assert.h>
#include <json-c/json.h>
#include "bake-config.h"
#include "bake.h"
#include "bake-rpc.h"
......@@ -193,7 +194,8 @@ static int write_transfer_data(margo_instance_id mid,
/* resolve addr, could be addr of rpc sender (normal case) or a third
* party (proxy write)
*/
if (provider->config.pipeline_enable == 0) {
if (!json_object_get_boolean(
json_object_object_get(provider->json_cfg, "pipeline_enable"))) {
/* normal path; no pipeline or intermediate buffers */
/* create bulk handle for local side of transfer */
......
......@@ -28,15 +28,6 @@ typedef struct {
UT_hash_handle hh;
} bake_target_t;
struct bake_provider_conf {
unsigned pipeline_enable; /* pipeline yes or no; implies intermediate
buffering */
unsigned pipeline_npools; /* number of preallocated buffer pools */
unsigned pipeline_nbuffers_per_pool; /* buffers per buffer pool */
unsigned pipeline_first_buffer_size; /* size of buffers in smallest pool */
unsigned pipeline_multiplier; /* factor size increase per pool */
};
typedef struct bake_provider {
margo_instance_id mid;
ABT_pool handler_pool; // pool used to run RPC handlers for this provider
......@@ -54,8 +45,7 @@ typedef struct bake_provider {
int owns_remi_provider;
#endif
struct bake_provider_conf config; /* configuration for transfers */
margo_bulk_poolset_t poolset; /* intermediate buffers, if used */
margo_bulk_poolset_t poolset; /* intermediate buffers, if used */
// list of RPC ids
hg_id_t rpc_create_id;
......
......@@ -156,7 +156,14 @@ int main(int argc, char** argv)
for (i = 0; i < opts.num_pools; i++) {
bake_provider_t provider;
bake_target_id_t tid;
struct bake_provider_init_info bpargs = {0};
struct bake_provider_init_info bpargs = {0};
char json_config[256] = {0};
if (opts.pipeline_enabled) {
sprintf(json_config, "{\"pipeline_enable\": true}");
bpargs.json_config = json_config;
}
ret = bake_provider_register(mid, i + 1, &bpargs, &provider);
if (ret != 0) {
......@@ -165,9 +172,6 @@ int main(int argc, char** argv)
return (-1);
}
if (opts.pipeline_enabled)
bake_provider_set_conf(provider, "pipeline_enabled", "1");
ret = bake_provider_add_storage_target(provider, opts.bake_pools[i],
&tid);
......@@ -185,7 +189,14 @@ int main(int argc, char** argv)
int i;
bake_provider_t provider;
struct bake_provider_init_info bpargs = {0};
struct bake_provider_init_info bpargs = {0};
char json_config[256] = {0};
if (opts.pipeline_enabled) {
sprintf(json_config, "{\"pipeline_enable\": true}");
bpargs.json_config = json_config;
}
ret = bake_provider_register(mid, 1, &bpargs, &provider);
if (ret != 0) {
......@@ -194,9 +205,6 @@ int main(int argc, char** argv)
return (-1);
}
if (opts.pipeline_enabled)
bake_provider_set_conf(provider, "pipeline_enabled", "1");
for (i = 0; i < opts.num_pools; i++) {
bake_target_id_t tid;
ret = bake_provider_add_storage_target(provider, opts.bake_pools[i],
......
......@@ -22,6 +22,7 @@
#include "bake-rpc.h"
#include "bake-timing.h"
#include "bake-provider.h"
#include "bake-macros.h"
extern bake_backend g_bake_pmem_backend;
extern bake_backend g_bake_file_backend;
......@@ -43,20 +44,13 @@ DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_region_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_target_ult)
/* TODO: support different parameters per provider instance */
struct bake_provider_conf g_default_bake_provider_conf
= {.pipeline_enable = 0,
.pipeline_npools = 4,
.pipeline_nbuffers_per_pool = 32,
.pipeline_first_buffer_size = 65536,
.pipeline_multiplier = 4};
/**
* Validates the format of the configuration and fills default values
* if they are not provided
*/
static int validate_and_complete_config(struct json_object* _config,
ABT_pool _progress_pool);
static int setup_poolset(bake_provider_t provider);
static bake_target_t* find_target_entry(bake_provider_t provider,
bake_target_id_t target_id)
......@@ -131,6 +125,8 @@ int bake_provider_register(margo_instance_id mid,
tmp_provider = calloc(1, sizeof(*tmp_provider));
if (!tmp_provider) return BAKE_ERR_ALLOCATION;
tmp_provider->json_cfg = config;
tmp_provider->mid = mid;
if (args.rpc_pool != NULL)
tmp_provider->handler_pool = args.rpc_pool;
......@@ -138,7 +134,15 @@ int bake_provider_register(margo_instance_id mid,
margo_get_handler_pool(mid, &(tmp_provider->handler_pool));
}
tmp_provider->config = g_default_bake_provider_conf;
ret = setup_poolset(tmp_provider);
if (ret != 0) {
fprintf(stderr, "Could not create poolset for pipelining");
json_object_put(config);
free(tmp_provider);
return ret;
}
/* create buffer poolset if needed for config */
/* Create rwlock */
ret = ABT_rwlock_create(&(tmp_provider->lock));
......@@ -1012,52 +1016,58 @@ static int bake_target_post_migration_callback(remi_fileset_t fileset,
#endif
static int set_conf_cb_pipeline_enabled(bake_provider_t provider,
const char* value)
static int setup_poolset(bake_provider_t provider)
{
int ret;
hg_return_t hret;
ret = sscanf(value, "%u", &provider->config.pipeline_enable);
if (ret != 1) return BAKE_ERR_INVALID_ARG;
if (provider->config.pipeline_enable) {
hret = margo_bulk_poolset_create(
provider->mid, provider->config.pipeline_npools,
provider->config.pipeline_nbuffers_per_pool,
provider->config.pipeline_first_buffer_size,
provider->config.pipeline_multiplier, HG_BULK_READWRITE,
&(provider->poolset));
if (hret != 0) return BAKE_ERR_MERCURY;
/* NOTE: this is called after validate, so we don't need extensive error
* checking on the json here
*/
/* nothing to do if pipelining is disabled */
if (!json_object_get_boolean(
json_object_object_get(provider->json_cfg, "pipeline_enable"))) {
return (0);
}
return BAKE_SUCCESS;
}
int bake_provider_set_conf(bake_provider_t provider,
const char* key,
const char* value)
{
/* TODO: make this more generic, manually issuing callbacks for
* particular keys right now.
*/
if (strcmp(key, "pipeline_enabled") == 0)
return set_conf_cb_pipeline_enabled(provider, value);
else
return BAKE_ERR_INVALID_ARG;
}
hret = margo_bulk_poolset_create(
provider->mid,
json_object_get_int(
json_object_object_get(provider->json_cfg, "pipeline_npools")),
json_object_get_int(json_object_object_get(
provider->json_cfg, "pipeline_nbuffers_per_pool")),
json_object_get_int(json_object_object_get(
provider->json_cfg, "pipeline_first_buffer_size")),
json_object_get_int(
json_object_object_get(provider->json_cfg, "pipeline_multiplier")),
HG_BULK_READWRITE, &(provider->poolset));
if (hret != 0) return BAKE_ERR_MERCURY;
int bake_target_set_conf(bake_provider_t provider,
bake_target_id_t tid,
const char* key,
const char* value)
{
// TODO
return 0;
return BAKE_SUCCESS;
}
static int validate_and_complete_config(struct json_object* _config,
ABT_pool _progress_pool)
{
/* TODO: fill this in */
struct json_object* val;
/* populate default pipeline settings if not specified already */
/* pipeline yes or no; implies intermediate buffering */
CONFIG_HAS_OR_CREATE(_config, boolean, "pipeline_enable", 0,
"pipeline_enable", val);
/* number of preallocated buffer pools */
CONFIG_HAS_OR_CREATE(_config, int64, "pipeline_npools", 4,
"pipeline_npools", val);
/* buffers per buffer pool */
CONFIG_HAS_OR_CREATE(_config, int64, "pipeline_nbuffers_per_pool", 32,
"pipeline_nbuffers_per_pool", val);
/* size of buffers in smallest pool */
CONFIG_HAS_OR_CREATE(_config, int64, "pipeline_first_buffer_size", 65536,
"pipeline_first_buffer_size", val);
/* factor size increase per pool */
CONFIG_HAS_OR_CREATE(_config, int64, "pipeline_multiplier", 4,
"pipeline_multiplier", val);
return (0);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment