Commit 94b56161 authored by Matthieu Dorier's avatar Matthieu Dorier

done implementing multiplexing providers

parent c2e5c47c
......@@ -15,22 +15,55 @@
extern "C" {
#endif
#define BAKE_CLIENT_NULL ((bake_client_t)NULL)
#define BAKE_TARGET_ID_NULL ((bake_target_id_t)NULL)
typedef struct bake_client* bake_client_t;
typedef struct bake_target* bake_target_id_t;
/**
* Creates a BAKE client attached to the given margo instance.
* This will effectively register the RPC needed by BAKE into
* the margo instance. The client must be freed with
* bake_client_finalize.
*
* @param[in] mid margo instance
* @param[out] client resulting bake client object
*
* @return 0 on success, -1 on failure
*/
int bake_client_init(margo_instance_id mid, bake_client_t* client);
/**
* Finalizes a BAKE client.
* WARNING: This function must not be called after Margo has been
* finalized. If you need to finalize a BAKE client when Margo is
* finalized, use margo_push_finalize_callback.
*
* @param client BAKE client to destroy
*
* @return 0 on success, -1 on failure
*/
int bake_client_finalize(bake_client_t client);
/**
* Obtains identifying information for a BAKE target through the provided
* remote mercury address.
* remote mercury address and multiplex id.
*
* @param [in] mid margo instance
* @param [in] client BAKE client
* @param [in] dest_addr destination Mercury address
* @param [in] mplex_id multiplex id
* @param [out] bti BAKE target identifier
* @returns 0 on success, -1 on failure
*/
int bake_probe_instance(
margo_instance_id mid,
bake_client_t client,
hg_addr_t dest_addr,
uint8_t mplex_id,
bake_target_id_t *bti);
/**
* Creates a bounded-size BAKE data region. The resulting region can be
* Creates a bounded-size BAKE data region. The resulting region can be
* written using BAKE write operations, and can be persisted (once writes
* are complete) with a a BAKE persist operation. The region is not valid
* for read access until persisted.
......@@ -204,17 +237,19 @@ int bake_proxy_read(
*
* @param [in] bti BAKE target_identifier
*/
void bake_release_instance(
void bake_target_id_release(
bake_target_id_t bti);
/**
* Shuts down a remote BAKE service (given a target ID).
*
* @param [in] bti BAKE target identifier
* @returns 0 on success, -1 on fialure
* Shuts down a remote BAKE service (given an address).
* This will shutdown all the providers on the target address.
*
* @param [in] client BAKE client
* @param [in] addr address of the server
* @returns 0 on success, -1 on failure
*/
int bake_shutdown_service(
bake_target_id_t bti);
bake_client_t client, hg_addr_t addr);
/**
* Issues a BAKE no-op operation.
......
......@@ -15,18 +15,24 @@
extern "C" {
#endif
#define BAKE_ABT_POOL_DEFAULT ABT_POOL_NULL
#define BAKE_MPLEX_ID_DEFAULT 0
#define BAKE_PROVIDER_IGNORE NULL
typedef struct bake_server_context_t* bake_provider_t;
/**
* Creates a BAKE pool to use for backend PMEM storage.
*
* NOTE: This function must be called on a pool before the pool
* can be passed to 'bake_server_init'.
* can be passed to 'bake_provider_register'.
*
* @param[in] pool_name path to PMEM backend file
* @param[in] pool_size size of the created pool
* @param[in] pool_mode mode of the created pool
* @returns 0 on success, -1 otherwise
*/
int bake_server_makepool(
int bake_makepool(
const char *pool_name,
size_t pool_size,
mode_t pool_mode);
......@@ -35,12 +41,18 @@ int bake_server_makepool(
* Initializes a BAKE server instance.
*
* @param[in] mid Margo instance identifier
* @param[in] mplex_id Multiplex id
* @param[in] pool Pool on which to run the RPC handlers
* @param[in] pool_name path to PMEM backend file
* @param[out] provider resulting provider
* @returns 0 on success, -1 otherwise
*/
int bake_server_init(
int bake_provider_register(
margo_instance_id mid,
const char *pool_name);
uint32_t mplex_id,
ABT_pool pool,
const char *pool_name,
bake_provider_t* provider);
#ifdef __cplusplus
}
......
......@@ -14,14 +14,9 @@
extern "C" {
#endif
/**
* Persistent, universal, opaque identifier for a BAKE target.
* Remains constant if instance is opened, closed, or migrated.
*/
typedef struct {
uuid_t id;
} bake_target_id_t;
} bake_uuid_t;
/**
* Persistent, opaque identifier for a region within a BAKE target.
*/
......
......@@ -18,7 +18,7 @@
* all remote BAKE targets. In the future we probably need to support
* multiple in case we run atop more than one transport at a time.
*/
struct bake_margo_instance
struct bake_client
{
margo_instance_id mid;
......@@ -35,100 +35,108 @@ struct bake_margo_instance
hg_id_t bake_noop_id;
};
/* Refers to an instance connected to a specific BAKE target */
struct bake_instance
{
bake_target_id_t bti; /* persistent identifier for this target */
hg_addr_t dest; /* resolved Mercury address */
UT_hash_handle hh;
};
struct bake_instance *instance_hash = NULL;
struct bake_margo_instance g_margo_inst = {
.mid = MARGO_INSTANCE_NULL,
struct bake_target {
struct bake_client* client;
bake_uuid_t pool_id;
hg_addr_t dest;
uint8_t mplex_id;
};
/* XXX calling this function again just overwrites the previous global mid...
* need to be smarter if we truly want to support multiple client-side mids
*/
static int bake_margo_instance_init(margo_instance_id mid)
static int bake_client_register(bake_client_t client, margo_instance_id mid)
{
g_margo_inst.mid = mid;
client->mid = mid;
/* register RPCs */
g_margo_inst.bake_probe_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_probe_rpc",
client->bake_probe_id =
MARGO_REGISTER(mid, "bake_probe_rpc",
void, bake_probe_out_t, NULL);
g_margo_inst.bake_shutdown_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_shutdown_rpc",
client->bake_shutdown_id =
MARGO_REGISTER(mid, "bake_shutdown_rpc",
void, void, NULL);
g_margo_inst.bake_create_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_create_rpc",
client->bake_create_id =
MARGO_REGISTER(mid, "bake_create_rpc",
bake_create_in_t, bake_create_out_t, NULL);
g_margo_inst.bake_write_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_write_rpc",
client->bake_write_id =
MARGO_REGISTER(mid, "bake_write_rpc",
bake_write_in_t, bake_write_out_t, NULL);
g_margo_inst.bake_eager_write_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_eager_write_rpc",
client->bake_eager_write_id =
MARGO_REGISTER(mid, "bake_eager_write_rpc",
bake_eager_write_in_t, bake_eager_write_out_t, NULL);
g_margo_inst.bake_eager_read_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_eager_read_rpc",
client->bake_eager_read_id =
MARGO_REGISTER(mid, "bake_eager_read_rpc",
bake_eager_read_in_t, bake_eager_read_out_t, NULL);
g_margo_inst.bake_persist_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_persist_rpc",
client->bake_persist_id =
MARGO_REGISTER(mid, "bake_persist_rpc",
bake_persist_in_t, bake_persist_out_t, NULL);
g_margo_inst.bake_create_write_persist_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_create_write_persist_rpc",
client->bake_create_write_persist_id =
MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
g_margo_inst.bake_get_size_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_get_size_rpc",
client->bake_get_size_id =
MARGO_REGISTER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t, NULL);
g_margo_inst.bake_read_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_read_rpc",
client->bake_read_id =
MARGO_REGISTER(mid, "bake_read_rpc",
bake_read_in_t, bake_read_out_t, NULL);
g_margo_inst.bake_noop_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_noop_rpc",
client->bake_noop_id =
MARGO_REGISTER(mid, "bake_noop_rpc",
void, void, NULL);
return(0);
}
int bake_client_init(margo_instance_id mid, bake_client_t* client)
{
bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
if(!c) return -1;
int ret = bake_client_register(c, mid);
if(ret != 0) return ret;
*client = c;
return 0;
}
int bake_client_finalize(bake_client_t client)
{
free(client);
return 0;
}
int bake_probe_instance(
margo_instance_id mid,
bake_client_t client,
hg_addr_t dest_addr,
uint8_t mplex_id,
bake_target_id_t *bti)
{
hg_return_t hret;
int ret;
bake_probe_out_t out;
hg_handle_t handle;
struct bake_instance *new_instance;
ret = bake_margo_instance_init(mid);
if(ret < 0)
return(ret);
struct bake_target *new_target;
new_instance = calloc(1, sizeof(*new_instance));
if(!new_instance)
new_target = calloc(1, sizeof(*new_target));
if(!new_target)
return(-1);
hret = margo_addr_dup(g_margo_inst.mid, dest_addr, &new_instance->dest);
new_target->client = client;
new_target->mplex_id = mplex_id;
hret = margo_addr_dup(client->mid, dest_addr, &new_target->dest);
if(hret != HG_SUCCESS)
{
free(new_instance);
free(new_target);
return(-1);
}
/* create handle */
hret = margo_create(g_margo_inst.mid, new_instance->dest,
g_margo_inst.bake_probe_id, &handle);
hret = margo_create(client->mid, new_target->dest,
client->bake_probe_id, &handle);
margo_set_target_id(handle, mplex_id);
if(hret != HG_SUCCESS)
{
margo_addr_free(g_margo_inst.mid, new_instance->dest);
free(new_instance);
margo_addr_free(client->mid, new_target->dest);
free(new_target);
return(-1);
}
......@@ -136,8 +144,8 @@ int bake_probe_instance(
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
margo_addr_free(g_margo_inst.mid, new_instance->dest);
free(new_instance);
margo_addr_free(client->mid, new_target->dest);
free(new_target);
return(-1);
}
......@@ -145,60 +153,45 @@ int bake_probe_instance(
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
margo_addr_free(g_margo_inst.mid, new_instance->dest);
free(new_instance);
margo_addr_free(client->mid, new_target->dest);
free(new_target);
return(-1);
}
ret = out.ret;
*bti = out.bti;
new_instance->bti = out.bti;
new_target->pool_id = out.pool_id;
margo_free_output(handle, &out);
margo_destroy(handle);
if(ret != 0)
{
margo_addr_free(g_margo_inst.mid, new_instance->dest);
free(new_instance);
}
else
{
/* TODO: safety check that it isn't already there. Here or earlier? */
HASH_ADD(hh, instance_hash, bti, sizeof(new_instance->bti), new_instance);
margo_addr_free(client->mid, new_target->dest);
free(new_target);
} else {
*bti = new_target;
}
return(ret);
}
void bake_release_instance(
void bake_target_id_release(
bake_target_id_t bti)
{
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return;
HASH_DELETE(hh, instance_hash, instance);
margo_addr_free(g_margo_inst.mid, instance->dest);
free(instance);
margo_addr_free(bti->client->mid, bti->dest);
free(bti);
return;
}
int bake_shutdown_service(bake_target_id_t bti)
int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
{
hg_return_t hret;
struct bake_instance *instance = NULL;
hg_handle_t handle;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
hret = margo_create(client->mid, addr,
client->bake_shutdown_id, &handle);
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_shutdown_id, &handle);
if(hret != HG_SUCCESS)
return(-1);
......@@ -225,20 +218,17 @@ static int bake_eager_write(
bake_eager_write_in_t in;
bake_eager_write_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.size = buf_size;
in.buffer = (char*)buf;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_eager_write_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_eager_write_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -276,29 +266,26 @@ int bake_write(
bake_write_in_t in;
bake_write_out_t out;
int ret;
struct bake_instance *instance = NULL;
if(buf_size <= BAKE_EAGER_LIMIT)
return(bake_eager_write(bti, rid, region_offset, buf, buf_size));
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
hret = margo_bulk_create(bti->client->mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_write_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_write_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
......@@ -342,14 +329,9 @@ int bake_proxy_write(
hg_handle_t handle;
bake_write_in_t in;
bake_write_out_t out;
struct bake_instance *instance = NULL;
int ret;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_handle = remote_bulk;
......@@ -357,8 +339,10 @@ int bake_proxy_write(
in.bulk_size = size;
in.remote_addr_str = (char*)remote_addr;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_write_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_write_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -392,20 +376,19 @@ int bake_create(
hg_handle_t handle;
bake_create_in_t in;
bake_create_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
int ret = 0;
in.bti = bti;
in.pool_id = bti->pool_id;
in.region_size = region_size;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_create_id, &handle);
if(hret != HG_SUCCESS)
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_create_id, &handle);
if(hret != HG_SUCCESS) {
return(-1);
}
margo_set_target_id(handle, bti->mplex_id);
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS)
......@@ -439,17 +422,14 @@ int bake_persist(
bake_persist_in_t in;
bake_persist_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.rid = rid;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_persist_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_persist_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -487,28 +467,25 @@ int bake_create_write_persist(
bake_create_write_persist_in_t in;
bake_create_write_persist_out_t out;
int ret;
struct bake_instance *instance = NULL;
/* XXX eager path? */
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.region_size = region_size;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
hret = margo_bulk_create(bti->client->mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_create_write_persist_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_create_write_persist_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
......@@ -556,13 +533,8 @@ int bake_create_write_persist_proxy(
bake_create_write_persist_in_t in;
bake_create_write_persist_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.region_size = region_size;
in.region_offset = region_offset;
in.bulk_handle = remote_bulk;
......@@ -570,8 +542,10 @@ int bake_create_write_persist_proxy(
in.bulk_size = size;
in.remote_addr_str = (char*)remote_addr;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_create_write_persist_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_create_write_persist_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -608,17 +582,14 @@ int bake_get_size(
bake_get_size_in_t in;
bake_get_size_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.rid = rid;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_get_size_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_get_size_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -649,14 +620,11 @@ int bake_noop(
{
hg_return_t hret;
hg_handle_t handle;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_noop_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_noop_id, &handle);
if(hret != HG_SUCCESS)
return(-1);
......@@ -683,19 +651,16 @@ static int bake_eager_read(
bake_eager_read_in_t in;
bake_eager_read_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.size = buf_size;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_eager_read_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_eager_read_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -734,29 +699,26 @@ int bake_read(
bake_read_in_t in;
bake_read_out_t out;
int ret;
struct bake_instance *instance = NULL;
if(buf_size <= BAKE_EAGER_LIMIT)
return(bake_eager_read(bti, rid, region_offset, buf, buf_size));
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
hret = margo_bulk_create(bti->client->mid, 1, (void**)(&buf), &buf_size,
HG_BULK_WRITE_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_read_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_read_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
......@@ -800,14 +762,9 @@ int bake_proxy_read(
hg_handle_t handle;
bake_read_in_t in;
bake_read_out_t out;
struct bake_instance *instance = NULL;
int ret;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_handle = remote_bulk;
......@@ -815,8 +772,10 @@ int bake_proxy_read(
in.bulk_size = size;
in.remote_addr_str = (char*)remote_addr;
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_read_id, &handle);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_read_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......