Commit 7835a534 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

finished implementing multi-target

parent 55295b22
......@@ -38,21 +38,86 @@ int bake_makepool(
mode_t pool_mode);
/**
* Initializes a BAKE server instance.
* Initializes a BAKE provider.
*
* @param[in] mid Margo instance identifier
* @param[in] mplex_id Multiplex id
* @param[in] pool Pool on which to run the RPC handlers
* @param[in] pool_name path to PMEM backend file
* @param[in] target_name path to PMEM backend file
* @param[out] provider resulting provider
* @returns 0 on success, -1 otherwise
*/
int bake_provider_register(
margo_instance_id mid,
uint32_t mplex_id,
ABT_pool pool,
const char *pool_name,
bake_provider_t* provider);
margo_instance_id mid,
uint8_t mplex_id,
ABT_pool pool,
bake_provider_t* provider);
/**
* Makes the provider start managing a target.
* The target must have been previously created with bake_makepool,
* and it should not be managed by another provider (whether in this
* proccess or another).
*
* @param provider Bake provider
* @param target_name path to pmem target
* @param target_id resulting id identifying the target
*
* @return 0 on success, -1 on failure
*/
int bake_provider_add_storage_target(
bake_provider_t provider,
const char *target_name,
bake_target_id_t* target_id);
/**
* Makes the provider stop managing a target.
*
* @param provider Bake provider
* @param target_id id of the target to remove
*
* @return 0 on success, -1 on failure
*/
int bake_provider_remove_storage_target(
bake_provider_t provider,
bake_target_id_t target_id);
/**
* Removes all the targets associated with a provider.
*
* @param provider Bake provider
*
* @return 0 on success, -1 on failure
*/
int bake_provider_remove_all_storage_targets(
bake_provider_t provider);
/**
* Returns the number of targets that this provider manages.
*
* @param provider Bake provider
* @param num_targets resulting number of targets
*
* @return 0 on success, -1 on failure
*/
int bake_provider_count_storage_targets(
bake_provider_t provider,
uint64_t* num_targets);
/**
* List the target ids of the targets managed by this provider.
* The targets array must be pre-allocated with at least enough
* space to hold all the targets (use bake_provider_count_storage_targets
* to know how many storage targets are managed).
*
* @param provider Bake provider
* @param targets resulting targer ids
*
* @return 0 on success, -1 on failure
*/
int bake_provider_list_storage_targets(
bake_provider_t provider,
bake_target_id_t* targets);
#ifdef __cplusplus
}
......
......@@ -48,40 +48,61 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
{
client->mid = mid;
/* register RPCs */
client->bake_probe_id =
MARGO_REGISTER(mid, "bake_probe_rpc",
bake_probe_in_t, bake_probe_out_t, NULL);
client->bake_shutdown_id =
MARGO_REGISTER(mid, "bake_shutdown_rpc",
void, void, NULL);
client->bake_create_id =
MARGO_REGISTER(mid, "bake_create_rpc",
bake_create_in_t, bake_create_out_t, NULL);
client->bake_write_id =
MARGO_REGISTER(mid, "bake_write_rpc",
bake_write_in_t, bake_write_out_t, NULL);
client->bake_eager_write_id =
MARGO_REGISTER(mid, "bake_eager_write_rpc",
bake_eager_write_in_t, bake_eager_write_out_t, NULL);
client->bake_eager_read_id =
MARGO_REGISTER(mid, "bake_eager_read_rpc",
bake_eager_read_in_t, bake_eager_read_out_t, NULL);
client->bake_persist_id =
MARGO_REGISTER(mid, "bake_persist_rpc",
bake_persist_in_t, bake_persist_out_t, NULL);
client->bake_create_write_persist_id =
MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
client->bake_get_size_id =
MARGO_REGISTER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t, NULL);
client->bake_read_id =
MARGO_REGISTER(mid, "bake_read_rpc",
bake_read_in_t, bake_read_out_t, NULL);
client->bake_noop_id =
MARGO_REGISTER(mid, "bake_noop_rpc",
void, void, NULL);
/* check if RPCs have already been registered */
hg_bool_t flag;
hg_id_t id;
margo_registered_name(mid, "bake_probe_rpc", &id, &flag);
if(flag == HG_TRUE) { /* RPCs already registered */
margo_registered_name(mid, "bake_probe_rpc", &client->bake_probe_id, &flag);
margo_registered_name(mid, "bake_shutdown_rpc", &client->bake_shutdown_id, &flag);
margo_registered_name(mid, "bake_create_rpc", &client->bake_create_id, &flag);
margo_registered_name(mid, "bake_write_rpc", &client->bake_write_id, &flag);
margo_registered_name(mid, "bake_eager_write_rpc", &client->bake_eager_write_id, &flag);
margo_registered_name(mid, "bake_eager_read_rpc", &client->bake_eager_read_id, &flag);
margo_registered_name(mid, "bake_persist_rpc", &client->bake_persist_id, &flag);
margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
margo_registered_name(mid, "bake_get_size_rpc", &client->bake_get_size_id, &flag);
margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag);
margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag);
} else { /* RPCs not already registered */
client->bake_probe_id =
MARGO_REGISTER(mid, "bake_probe_rpc",
bake_probe_in_t, bake_probe_out_t, NULL);
client->bake_shutdown_id =
MARGO_REGISTER(mid, "bake_shutdown_rpc",
void, void, NULL);
client->bake_create_id =
MARGO_REGISTER(mid, "bake_create_rpc",
bake_create_in_t, bake_create_out_t, NULL);
client->bake_write_id =
MARGO_REGISTER(mid, "bake_write_rpc",
bake_write_in_t, bake_write_out_t, NULL);
client->bake_eager_write_id =
MARGO_REGISTER(mid, "bake_eager_write_rpc",
bake_eager_write_in_t, bake_eager_write_out_t, NULL);
client->bake_eager_read_id =
MARGO_REGISTER(mid, "bake_eager_read_rpc",
bake_eager_read_in_t, bake_eager_read_out_t, NULL);
client->bake_persist_id =
MARGO_REGISTER(mid, "bake_persist_rpc",
bake_persist_in_t, bake_persist_out_t, NULL);
client->bake_create_write_persist_id =
MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
client->bake_get_size_id =
MARGO_REGISTER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t, NULL);
client->bake_read_id =
MARGO_REGISTER(mid, "bake_read_rpc",
bake_read_in_t, bake_read_out_t, NULL);
client->bake_noop_id =
MARGO_REGISTER(mid, "bake_noop_rpc",
void, void, NULL);
}
return(0);
}
......@@ -160,10 +181,11 @@ int bake_probe(
if(max_targets == 0) {
*num_targets = out.num_targets;
} else {
uint64_t s = *num_targets > max_targets ? max_targets : *num_targets;
uint64_t s = out.num_targets > max_targets ? max_targets : out.num_targets;
if(s > 0) {
memcpy(bti, out.targets, sizeof(*bti)*s);
}
*num_targets = s;
}
}
......@@ -440,7 +462,8 @@ int bake_create(
}
ret = out.ret;
*rid = out.rid;
if(ret == 0)
*rid = out.rid;
margo_free_output(handle, &out);
margo_destroy(handle);
......@@ -519,7 +542,6 @@ int bake_create_write_persist(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_create_write_persist_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
{
......@@ -527,6 +549,8 @@ int bake_create_write_persist(
return(-1);
}
margo_set_target_id(handle, provider->mplex_id);
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS)
{
......
......@@ -13,12 +13,18 @@
#include <libpmemobj.h>
#include <bake-server.h>
typedef enum {
MODE_TARGETS = 0,
MODE_PROVIDERS = 1
} mplex_mode_t;
struct options
{
char *listen_addr_str;
unsigned num_pools;
char **bake_pools;
char *host_file;
mplex_mode_t mplex_mode;
};
static void usage(int argc, char **argv)
......@@ -27,6 +33,7 @@ static void usage(int argc, char **argv)
fprintf(stderr, " listen_addr is the Mercury address to listen on\n");
fprintf(stderr, " bake_pool is the path to the BAKE pool\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or targets) for managing multiple pools (default is targets)\n");
fprintf(stderr, "Example: ./bake-server-daemon tcp://localhost:1234 /dev/shm/foo.dat /dev/shm/bar.dat\n");
return;
}
......@@ -38,13 +45,23 @@ static void parse_args(int argc, char **argv, struct options *opts)
memset(opts, 0, sizeof(*opts));
/* get options */
while((opt = getopt(argc, argv, "f:")) != -1)
while((opt = getopt(argc, argv, "f:m:")) != -1)
{
switch(opt)
{
case 'f':
opts->host_file = optarg;
break;
case 'm':
if(0 == strcmp(optarg, "targets"))
opts->mplex_mode = MODE_TARGETS;
else if(0 == strcmp(optarg, "providers"))
opts->mplex_mode = MODE_PROVIDERS;
else {
fprintf(stderr, "Unrecognized multiplexing mode \"%s\"\n", optarg);
exit(EXIT_FAILURE);
}
break;
default:
usage(argc, argv);
exit(EXIT_FAILURE);
......@@ -125,18 +142,62 @@ int main(int argc, char **argv)
}
/* initialize the BAKE server */
int i;
for(i=0; i< opts.num_pools; i++) {
ret = bake_provider_register(mid, i+1,
BAKE_ABT_POOL_DEFAULT, opts.bake_pools[i],
BAKE_PROVIDER_IGNORE);
}
if(opts.mplex_mode == MODE_PROVIDERS) {
int i;
for(i=0; i< opts.num_pools; i++) {
bake_provider_t provider;
bake_target_id_t tid;
ret = bake_provider_register(mid, i+1,
BAKE_ABT_POOL_DEFAULT,
&provider);
if(ret != 0)
{
fprintf(stderr, "Error: bake_provider_register()\n");
margo_finalize(mid);
return(-1);
}
ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);
if(ret != 0)
{
fprintf(stderr, "Error: bake_provider_add_storage_target()\n");
margo_finalize(mid);
return(-1);
}
printf("Provider %d managing new target at multiplex id %d\n", i, i+1);
}
if(ret != 0)
{
fprintf(stderr, "Error: bake_provider_register()\n");
margo_finalize(mid);
return(-1);
} else {
int i;
bake_provider_t provider;
ret = bake_provider_register(mid, 1,
BAKE_ABT_POOL_DEFAULT,
&provider);
if(ret != 0)
{
fprintf(stderr, "Error: bake_provider_register()\n");
margo_finalize(mid);
return(-1);
}
for(i=0; i < opts.num_pools; i++) {
bake_target_id_t tid;
ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);
if(ret != 0)
{
fprintf(stderr, "Error: bake_provider_add_storage_target()\n");
margo_finalize(mid);
return(-1);
}
printf("Provider %d managing new target at multiplex id %d\n", i, i+1);
}
}
/* suspend until the BAKE server gets a shutdown signal from the client */
......
......@@ -9,6 +9,7 @@
#include <assert.h>
#include <libpmemobj.h>
#include <bake-server.h>
#include "uthash.h"
#include "bake-rpc.h"
/* definition of BAKE root data structure (just a uuid for now) */
......@@ -16,7 +17,7 @@ typedef struct
{
bake_target_id_t pool_id;
} bake_root_t;
/* definition of internal BAKE region_id_t identifier for libpmemobj back end */
typedef struct
{
......@@ -24,18 +25,26 @@ typedef struct
uint64_t size;
} pmemobj_region_id_t;
typedef struct
{
PMEMobjpool* pmem_pool;
bake_root_t* pmem_root;
bake_target_id_t target_id;
UT_hash_handle hh;
} bake_pmem_entry_t;
typedef struct bake_server_context_t
{
PMEMobjpool *pmem_pool;
bake_root_t *pmem_root;
uint64_t num_targets;
bake_pmem_entry_t* targets;
} bake_server_context_t;
static void bake_server_finalize_cb(void *data);
int bake_makepool(
const char *pool_name,
size_t pool_size,
mode_t pool_mode)
const char *pool_name,
size_t pool_size,
mode_t pool_mode)
{
PMEMobjpool *pool;
PMEMoid root_oid;
......@@ -67,103 +76,184 @@ int bake_makepool(
}
int bake_provider_register(
margo_instance_id mid,
uint32_t mplex_id,
ABT_pool abt_pool,
const char *pool_name,
bake_provider_t* provider)
margo_instance_id mid,
uint8_t mplex_id,
ABT_pool abt_pool,
bake_provider_t* provider)
{
PMEMobjpool *pool;
PMEMoid root_oid;
bake_root_t *root;
bake_server_context_t *tmp_svr_ctx;
tmp_svr_ctx = calloc(1,sizeof(*tmp_svr_ctx));
if(!tmp_svr_ctx)
return(-1);
/* open the given pmem pool */
pool = pmemobj_open(pool_name, NULL);
if(!pool)
/* check if a provider with the same multiplex id already exists */
{
fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
return(-1);
hg_id_t id;
hg_bool_t flag;
margo_registered_name_mplex(mid, "bake_probe_rpc", mplex_id, &id, &flag);
if(flag == HG_TRUE) {
fprintf(stderr, "bake_provider_register(): a provider with the same mplex id (%d) already exists\n", mplex_id);
return -1;
}
}
/* check to make sure the root is properly set */
root_oid = pmemobj_root(pool, sizeof(bake_root_t));
root = pmemobj_direct(root_oid);
if(uuid_is_null(root->pool_id.id))
{
fprintf(stderr, "Error: BAKE pool is not properly initialized\n");
pmemobj_close(pool);
/* allocate the resulting structure */
tmp_svr_ctx = calloc(1,sizeof(*tmp_svr_ctx));
if(!tmp_svr_ctx)
return(-1);
}
#if 0
char target_string[64];
uuid_unparse(root->id, target_string);
fprintf(stderr, "opened BAKE target ID: %s\n", target_string);
#endif
/* register RPCs */
hg_id_t rpc_id;
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_shutdown_rpc",
void, void, bake_shutdown_ult, mplex_id, abt_pool);
void, void, bake_shutdown_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
void* test = margo_registered_data_mplex(mid, rpc_id, mplex_id);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_create_rpc",
bake_create_in_t, bake_create_out_t,
bake_create_ult, mplex_id, abt_pool);
bake_create_in_t, bake_create_out_t,
bake_create_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_write_rpc",
bake_write_in_t, bake_write_out_t,
bake_write_ult, mplex_id, abt_pool);
bake_write_in_t, bake_write_out_t,
bake_write_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_eager_write_rpc",
bake_eager_write_in_t, bake_eager_write_out_t,
bake_eager_write_ult, mplex_id, abt_pool);
bake_eager_write_in_t, bake_eager_write_out_t,
bake_eager_write_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_eager_read_rpc",
bake_eager_read_in_t, bake_eager_read_out_t,
bake_eager_read_ult, mplex_id, abt_pool);
bake_eager_read_in_t, bake_eager_read_out_t,
bake_eager_read_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_persist_rpc",
bake_persist_in_t, bake_persist_out_t,
bake_persist_ult, mplex_id, abt_pool);
bake_persist_in_t, bake_persist_out_t,
bake_persist_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_create_write_persist_rpc",
bake_create_write_persist_in_t, bake_create_write_persist_out_t,
bake_create_write_persist_ult, mplex_id, abt_pool);
bake_create_write_persist_in_t, bake_create_write_persist_out_t,
bake_create_write_persist_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t,
bake_get_size_ult, mplex_id, abt_pool);
bake_get_size_in_t, bake_get_size_out_t,
bake_get_size_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_read_rpc",
bake_read_in_t, bake_read_out_t,
bake_read_ult, mplex_id, abt_pool);
bake_read_in_t, bake_read_out_t,
bake_read_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_probe_rpc",
bake_probe_in_t, bake_probe_out_t, bake_probe_ult,
mplex_id, abt_pool);
bake_probe_in_t, bake_probe_out_t, bake_probe_ult,
mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_noop_rpc",
void, void, bake_noop_ult, mplex_id, abt_pool);
void, void, bake_noop_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);
/* set global server context */
tmp_svr_ctx->pmem_pool = pool;
tmp_svr_ctx->pmem_root = root;
if(provider != BAKE_PROVIDER_IGNORE)
*provider = tmp_svr_ctx;
return(0);
}
int bake_provider_add_storage_target(
bake_provider_t provider,
const char *target_name,
bake_target_id_t* target_id)
{
bake_pmem_entry_t* new_entry = calloc(1, sizeof(*new_entry));
new_entry->pmem_pool = pmemobj_open(target_name, NULL);
if(!(new_entry->pmem_pool)) {
fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
free(new_entry);
return -1;
}
/* check to make sure the root is properly set */
PMEMoid root_oid = pmemobj_root(new_entry->pmem_pool, sizeof(bake_root_t));
new_entry->pmem_root = pmemobj_direct(root_oid);
bake_target_id_t key = new_entry->pmem_root->pool_id;
new_entry->target_id = key;
if(uuid_is_null(key.id))
{
fprintf(stderr, "Error: BAKE pool %s is not properly initialized\n", target_name);
pmemobj_close(new_entry->pmem_pool);
free(new_entry);
return(-1);
}
/* insert in the provider's hash */
HASH_ADD(hh, provider->targets, target_id, sizeof(bake_target_id_t), new_entry);
/* check that it was inserted */
bake_pmem_entry_t* check_entry = NULL;
HASH_FIND(hh, provider->targets, &key, sizeof(bake_target_id_t), check_entry);
if(check_entry != new_entry) {
fprintf(stderr, "Error: BAKE could not insert new pmem pool into the hash\n");
pmemobj_close(new_entry->pmem_pool);
free(new_entry);
return -1;
}
provider->num_targets += 1;
*target_id = key;
return 0;
}
static bake_pmem_entry_t* find_pmem_entry(
bake_provider_t provider,
bake_target_id_t target_id)
{
bake_pmem_entry_t* entry = NULL;
HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
return entry;
}
int bake_provider_remove_storage_target(
bake_provider_t provider,
bake_target_id_t target_id)
{
bake_pmem_entry_t* entry = NULL;
HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
if(!entry) return -1;
pmemobj_close(entry->pmem_pool);
HASH_DEL(provider->targets, entry);
free(entry);
return 0;
}
int bake_provider_remove_all_storage_targets(
bake_provider_t provider)
{
bake_pmem_entry_t *p, *tmp;
HASH_ITER(hh, provider->targets, p, tmp) {
HASH_DEL(provider->targets, p);
pmemobj_close(p->pmem_pool);
free(p);
}
return 0;
}
int bake_provider_count_storage_targets(
bake_provider_t provider,
uint64_t* num_targets)
{
*num_targets = provider->num_targets;
return 0;
}